deserialize load report based on load-manager (#338)
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
index 2cadbef..8345e35 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
@@ -27,6 +27,8 @@
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
@@ -56,6 +58,13 @@
* Generate the load report
*/
LoadReport generateLoadReport() throws Exception;
+
+ /**
+ * Returns {@link Deserializer} to deserialize load report
+ *
+ * @return
+ */
+ Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
/**
* Set flag to force load report update
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java
index 14268d5..e4f0fff 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -18,6 +18,8 @@
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* New proposal for a load manager interface which attempts to use more intuitive method names and provide a starting
@@ -88,4 +90,11 @@
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
void writeBundleDataOnZooKeeper();
+
+ /**
+ * Return :{@link Deserializer} to deserialize load-manager load report
+ *
+ * @return
+ */
+ Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d4ef9e8..453c19a 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -16,6 +16,7 @@
package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -66,6 +67,7 @@
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
@@ -156,6 +158,9 @@
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;
+
+ private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
+ .readValue(content, LocalBrokerData.class);
/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
@@ -556,6 +561,11 @@
}
}
+ @Override
+ public Deserializer<LocalBrokerData> getLoadReportDeserializer() {
+ return loadReportDeserializer;
+ }
+
/**
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 498083a..e515d1c 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -26,6 +26,8 @@
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
@@ -103,4 +105,9 @@
public void writeResourceQuotasToZooKeeper() {
loadManager.writeBundleDataOnZooKeeper();
}
+
+ @Override
+ public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
+ return loadManager.getLoadReportDeserializer();
+ }
}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 79c6bdf..42f4dac 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -75,9 +75,11 @@
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
+import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {
@@ -176,6 +178,8 @@
private long lastResourceUsageTimestamp = -1;
// flag to force update load report
private boolean forceLoadReportUpdate = false;
+ private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
+ .readValue(content, LoadReport.class);
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
@@ -315,6 +319,11 @@
}
}
+ @Override
+ public Deserializer<LoadReport> getLoadReportDeserializer() {
+ return loadReportDeserializer;
+ }
+
public ZooKeeperChildrenCache getActiveBrokersCache() {
return this.availableActiveBrokers;
}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java
index c14ce26..41bf749 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java
@@ -150,10 +150,6 @@
return bundleFactory.getFullBundle(fqnn);
}
- private static final Deserializer<ServiceLookupData> serviceLookupDataDeserializer = (key, content) ->
- jsonMapper().readValue(content, ServiceLookupData.class);
-
-
public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
@@ -399,7 +395,7 @@
}
}
- private CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
+ protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
@@ -407,7 +403,7 @@
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
- pulsar.getLocalZkCache().getDataAsync(path, serviceLookupDataDeserializer).thenAccept(reportData -> {
+ pulsar.getLocalZkCache().getDataAsync(path, pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(reportData -> {
if (reportData.isPresent()) {
ServiceLookupData lookupData = reportData.get();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
index 0ea8290..cf68882 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -31,14 +31,19 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -49,9 +54,18 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
+import com.yahoo.pulsar.broker.LocalBrokerData;
+import com.yahoo.pulsar.broker.PulsarServerException;
+import com.yahoo.pulsar.broker.PulsarService;
+import com.yahoo.pulsar.broker.loadbalance.LoadManager;
+import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager;
+import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
@@ -62,9 +76,13 @@
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
+import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.Policies;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
public class NamespaceServiceTest extends BrokerTestBase {
@@ -279,7 +297,43 @@
// ok
}
}
-
+
+ /**
+ * <pre>
+ * It verifies that namespace service deserialize the load-report based on load-manager which active.
+ * 1. write candidate1- load report using {@link LoadReport} which is used by SimpleLoadManagerImpl
+ * 2. Write candidate2- load report using {@link LocalBrokerData} which is used by ModularLoadManagerImpl
+ * 3. try to get Lookup Result based on active load-manager
+ * </pre>
+ * @throws Exception
+ */
+ @Test
+ public void testLoadReportDeserialize() throws Exception {
+
+ final String candidateBroker1 = "http://localhost:8000";
+ final String candidateBroker2 = "http://localhost:3000";
+ LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
+ LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
+ URI uri1 = new URI(candidateBroker1);
+ URI uri2 = new URI(candidateBroker2);
+ String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
+ String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
+ ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path1,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1).get();
+
+ // update to new load mananger
+ pulsar.getLoadManager().set(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
+ LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2).get();
+ Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
+ Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
+ System.out.println(result2);
+ }
+
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {