Avoid redundant writes in AssignmentMetadataStore (#564)

For the WAGED rebalancer, we persist the cluster's mapping via AssignmentMetadataStore every pipeline. However, if there are no changes made to the new assignment from the old assignment, this write is not necessary. This diff checks whether they are equal and skips the write if old and new assignments are the same.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index a540ffb..234c88c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -94,6 +94,10 @@
 
   public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     // TODO: Make the write async?
+    // If baseline hasn't changed, skip writing to metadata store
+    if (compareAssignments(_globalBaseline, globalBaseline)) {
+      return;
+    }
     // Persist to ZK
     HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
     try {
@@ -110,14 +114,18 @@
   public void persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     // TODO: Make the write async?
-    // Persist to ZK asynchronously
+    // If bestPossibleAssignment hasn't changed, skip writing to metadata store
+    if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
+      return;
+    }
+    // Persist to ZK
     HelixProperty combinedAssignments =
         combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
     try {
       _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
     } catch (IOException e) {
       // TODO: Improve failure handling
-      throw new HelixException("Failed to persist baseline!", e);
+      throw new HelixException("Failed to persist BestPossibleAssignment!", e);
     }
 
     // Update the in-memory reference
@@ -164,4 +172,17 @@
             new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
     return assignmentMap;
   }
+
+  /**
+   * Returns whether two assignments are same.
+   * @param oldAssignment
+   * @param newAssignment
+   * @return true if they are the same. False otherwise or oldAssignment is null
+   */
+  private boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
+      Map<String, ResourceAssignment> newAssignment) {
+    // If oldAssignment is null, that means that we haven't read from/written to
+    // the metadata store yet. In that case, we return false so that we write to metadata store.
+    return oldAssignment != null && oldAssignment.equals(newAssignment);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index ecd2af3..59326e7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -19,20 +19,25 @@
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+
 public class TestAssignmentMetadataStore extends ZkTestBase {
   protected static final int NODE_NR = 5;
   protected static final int START_PORT = 12918;
@@ -51,7 +56,8 @@
   private AssignmentMetadataStore _store;
 
   @BeforeClass
-  public void beforeClass() throws Exception {
+  public void beforeClass()
+      throws Exception {
     super.beforeClass();
 
     // setup storage cluster
@@ -76,8 +82,8 @@
     _controller.syncStart();
 
     // create cluster manager
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
     _manager.connect();
 
     // create AssignmentMetadataStore
@@ -104,4 +110,56 @@
     Map<String, ResourceAssignment> baseline = _store.getBaseline();
     Assert.assertTrue(baseline.isEmpty());
   }
+
+  /**
+   * Test that if the old assignment and new assignment are the same,
+   */
+  @Test(dependsOnMethods = "testReadEmptyBaseline")
+  public void testAvoidingRedundantWrite() {
+    String baselineKey = "BASELINE";
+    String bestPossibleKey = "BEST_POSSIBLE";
+
+    // Generate a dummy assignment
+    Map<String, ResourceAssignment> dummyAssignment = new HashMap<>();
+    ResourceAssignment assignment = new ResourceAssignment(TEST_DB);
+    Partition partition = new Partition(TEST_DB);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(TEST_DB, TEST_DB);
+    assignment.addReplicaMap(partition, replicaMap);
+    dummyAssignment.put(TEST_DB, new ResourceAssignment(TEST_DB));
+
+    // Call persist functions
+    _store.persistBaseline(dummyAssignment);
+    _store.persistBestPossibleAssignment(dummyAssignment);
+
+    // Check that only one version exists
+    List<String> baselineVersions = getExistingVersionNumbers(baselineKey);
+    List<String> bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
+    Assert.assertEquals(baselineVersions.size(), 1);
+    Assert.assertEquals(bestPossibleVersions.size(), 1);
+
+    // Call persist functions again
+    _store.persistBaseline(dummyAssignment);
+    _store.persistBestPossibleAssignment(dummyAssignment);
+
+    // Check that only one version exists still
+    baselineVersions = getExistingVersionNumbers(baselineKey);
+    bestPossibleVersions = getExistingVersionNumbers(bestPossibleKey);
+    Assert.assertEquals(baselineVersions.size(), 1);
+    Assert.assertEquals(bestPossibleVersions.size(), 1);
+  }
+
+  /**
+   * Returns a list of existing version numbers only.
+   * @param metadataType
+   * @return
+   */
+  private List<String> getExistingVersionNumbers(String metadataType) {
+    List<String> children = _baseAccessor
+        .getChildNames("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/" + metadataType,
+            AccessOption.PERSISTENT);
+    children.remove("LAST_SUCCESSFUL_WRITE");
+    children.remove("LAST_WRITE");
+    return children;
+  }
 }