deserialize load report based on load-manager (#338)
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
index 2cadbef..8345e35 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
@@ -27,6 +27,8 @@
 import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 
 /**
  * LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
@@ -56,6 +58,13 @@
      * Generate the load report
      */
     LoadReport generateLoadReport() throws Exception;
+    
+    /**
+     * Returns {@link Deserializer} to deserialize load report 
+     * 
+     * @return
+     */
+    Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
 
     /**
      * Set flag to force load report update
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java
index 14268d5..e4f0fff 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -18,6 +18,8 @@
 import com.yahoo.pulsar.broker.PulsarServerException;
 import com.yahoo.pulsar.broker.PulsarService;
 import com.yahoo.pulsar.common.naming.ServiceUnitId;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 
 /**
  * New proposal for a load manager interface which attempts to use more intuitive method names and provide a starting
@@ -88,4 +90,11 @@
      * As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
      */
     void writeBundleDataOnZooKeeper();
+
+    /**
+     * Return :{@link Deserializer} to deserialize load-manager load report
+     * 
+     * @return
+     */
+    Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
 }
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d4ef9e8..453c19a 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -16,6 +16,7 @@
 package com.yahoo.pulsar.broker.loadbalance.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -66,6 +67,7 @@
 import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
 import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
 import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
 import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
 import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
@@ -156,6 +158,9 @@
 
     // ZooKeeper belonging to the pulsar service.
     private ZooKeeper zkClient;
+    
+    private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
+            .readValue(content, LocalBrokerData.class);
 
     /**
      * Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
@@ -556,6 +561,11 @@
         }
     }
 
+    @Override
+    public Deserializer<LocalBrokerData> getLoadReportDeserializer() {
+        return loadReportDeserializer;
+    }
+
     /**
      * As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
      */
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 498083a..e515d1c 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -26,6 +26,8 @@
 import com.yahoo.pulsar.broker.stats.Metrics;
 import com.yahoo.pulsar.common.naming.ServiceUnitId;
 import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 
 /**
  * Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
@@ -103,4 +105,9 @@
     public void writeResourceQuotasToZooKeeper() {
         loadManager.writeBundleDataOnZooKeeper();
     }
+
+    @Override
+    public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
+        return loadManager.getLoadReportDeserializer();
+    }
 }
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 79c6bdf..42f4dac 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -75,9 +75,11 @@
 import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
 import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
 import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
 import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
 import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
+import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
 
 public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {
 
@@ -176,6 +178,8 @@
     private long lastResourceUsageTimestamp = -1;
     // flag to force update load report
     private boolean forceLoadReportUpdate = false;
+    private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
+            .readValue(content, LoadReport.class); 
 
     // Perform initializations which may be done without a PulsarService.
     public SimpleLoadManagerImpl() {
@@ -315,6 +319,11 @@
         }
     }
 
+    @Override
+    public Deserializer<LoadReport> getLoadReportDeserializer() {
+        return loadReportDeserializer;
+    }
+
     public ZooKeeperChildrenCache getActiveBrokersCache() {
         return this.availableActiveBrokers;
     }
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java
index c14ce26..41bf749 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java
@@ -150,10 +150,6 @@
         return bundleFactory.getFullBundle(fqnn);
     }
 
-    private static final Deserializer<ServiceLookupData> serviceLookupDataDeserializer = (key, content) ->
-            jsonMapper().readValue(content, ServiceLookupData.class);
-
-
 	public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
 			throws Exception {
         if (suName instanceof DestinationName) {
@@ -399,7 +395,7 @@
         }
     }
 
-    private CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
+    protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
 
         CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
         try {
@@ -407,7 +403,7 @@
             URI uri = new URI(candidateBroker);
             String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
                     uri.getPort());
-            pulsar.getLocalZkCache().getDataAsync(path, serviceLookupDataDeserializer).thenAccept(reportData -> {
+            pulsar.getLocalZkCache().getDataAsync(path, pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(reportData -> {
                 if (reportData.isPresent()) {
                     ServiceLookupData lookupData = reportData.get();
                     lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
index 0ea8290..cf68882 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -31,14 +31,19 @@
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -49,9 +54,18 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
+import com.yahoo.pulsar.broker.LocalBrokerData;
+import com.yahoo.pulsar.broker.PulsarServerException;
+import com.yahoo.pulsar.broker.PulsarService;
+import com.yahoo.pulsar.broker.loadbalance.LoadManager;
+import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager;
+import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import com.yahoo.pulsar.broker.lookup.LookupResult;
 import com.yahoo.pulsar.broker.service.BrokerTestBase;
 import com.yahoo.pulsar.broker.service.Topic;
 import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
@@ -62,9 +76,13 @@
 import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
 import com.yahoo.pulsar.common.naming.NamespaceBundles;
 import com.yahoo.pulsar.common.naming.NamespaceName;
+import com.yahoo.pulsar.common.naming.ServiceUnitId;
 import com.yahoo.pulsar.common.policies.data.Policies;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
 import com.yahoo.pulsar.common.util.ObjectMapperFactory;
 import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 
 public class NamespaceServiceTest extends BrokerTestBase {
 
@@ -279,7 +297,43 @@
             // ok
         }
     }
-    
+
+    /**
+     * <pre>
+     *  It verifies that namespace service deserialize the load-report based on load-manager which active.
+     *  1. write candidate1- load report using {@link LoadReport} which is used by SimpleLoadManagerImpl
+     *  2. Write candidate2- load report using {@link LocalBrokerData} which is used by ModularLoadManagerImpl
+     *  3. try to get Lookup Result based on active load-manager
+     * </pre>
+     * @throws Exception
+     */
+    @Test
+    public void testLoadReportDeserialize() throws Exception {
+
+        final String candidateBroker1 = "http://localhost:8000";
+        final String candidateBroker2 = "http://localhost:3000";
+        LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
+        LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
+        URI uri1 = new URI(candidateBroker1);
+        URI uri2 = new URI(candidateBroker2);
+        String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
+        String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
+        ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path1,
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+        ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+        LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1).get();
+
+        // update to new load mananger
+        pulsar.getLoadManager().set(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
+        LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2).get();
+        Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
+        Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
+        System.out.println(result2);
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {