Fix race condition between instance drop and participant history update (#2073)

Fix race condition between instance drop and participant history update
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 70acf31..f81f4b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -82,8 +82,8 @@
       .asList(HelixConstants.ChangeType.EXTERNAL_VIEW,
           HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW);
 
-  private String _clusterName = AbstractDataCache.UNKNOWN_CLUSTER;
-  private String _pipelineName = AbstractDataCache.UNKNOWN_PIPELINE;
+  private final String _clusterName;
+  private final String _pipelineName;
   private String _clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
   private ClusterConfig _clusterConfig;
 
@@ -106,17 +106,17 @@
   private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;
 
   // Special caches
-  private CurrentStateCache _currentStateCache;
+  private final CurrentStateCache _currentStateCache;
   protected TaskCurrentStateCache _taskCurrentStateCache;
-  private InstanceMessagesCache _instanceMessagesCache;
+  private final InstanceMessagesCache _instanceMessagesCache;
 
   // Other miscellaneous caches
   private Map<String, Long> _instanceOfflineTimeMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
-  private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
-  private Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
+  private final Set<String> _disabledInstanceSet = new HashSet<>();
   private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
-  private Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
+  private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
   private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
 
   public BaseControllerDataProvider() {
@@ -777,12 +777,15 @@
       PropertyKey propertyKey = keyBuilder.participantHistory(instance);
       ParticipantHistory history = accessor.getProperty(propertyKey);
       if (history == null) {
-        history = new ParticipantHistory(instance);
+        // this means the instance has been removed, skip history update
+        continue;
       }
       if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) {
         history.reportOffline();
-        // persist history back to ZK.
-        if (!accessor.setProperty(propertyKey, history)) {
+        // persist history back to ZK only if the node still exists
+        boolean succeed = accessor.updateProperty(propertyKey,
+            currentData -> (currentData == null) ? null : history.getRecord(), history);
+        if (!succeed) {
           LogUtil.logError(logger, getClusterEventId(),
               "Fails to persist participant online history back to ZK!");
         }
@@ -822,6 +825,10 @@
       long timeOutWindow) {
     ParticipantHistory history =
         accessor.getProperty(accessor.keyBuilder().participantHistory(instance));
+    if (history == null) {
+      LogUtil.logWarn(logger, getClusterEventId(), "Participant history is null for instance " + instance);
+      return false;
+    }
     List<Long> onlineTimestamps = history.getOnlineTimestampsAsMilliseconds();
     List<Long> offlineTimestamps = history.getOfflineTimestampsAsMilliseconds();
 
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 09d994f..741e5c4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -351,7 +351,7 @@
 
     ParticipantHistory history = getHistory();
     history.reportOnline(_sessionId, _manager.getVersion());
-    persistHistory(history);
+    persistHistory(history, false);
   }
 
   /**
@@ -494,15 +494,16 @@
   private ParticipantHistory getHistory() {
     PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
     ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
-    if (history == null) {
-      history = new ParticipantHistory(_instanceName);
-    }
-    return history;
+    return history == null ? new ParticipantHistory(_instanceName) : history;
   }
 
-  private void persistHistory(ParticipantHistory history) {
+  private void persistHistory(ParticipantHistory history, boolean skipOnEmptyPath) {
     PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
-    if (!_dataAccessor.setProperty(propertyKey, history)) {
+    boolean result = skipOnEmptyPath
+        ? _dataAccessor.updateProperty(
+            propertyKey, currentData -> (currentData == null) ? null : history.getRecord(), history)
+        : _dataAccessor.setProperty(propertyKey, history);
+    if (!result) {
       LOG.error("Failed to persist participant history to zk!");
     }
   }
@@ -514,7 +515,7 @@
     try {
       ParticipantHistory history = getHistory();
       history.reportOffline();
-      persistHistory(history);
+      persistHistory(history, true);
     } catch (Exception e) {
       LOG.error("Failed to report participant offline.", e);
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 28a9e4b..673536a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -205,6 +205,10 @@
     _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true);
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId));
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 6485801..e15947b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -25,8 +25,6 @@
 import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
 import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -39,7 +37,7 @@
 import org.slf4j.LoggerFactory;
 
 public class MockParticipantManager extends ClusterManager {
-  private static Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);
+  private static final Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);
 
   protected int _transDelay = 10;
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 050bc76..95d1f45 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -24,6 +24,9 @@
 import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.MBeanServer;
@@ -31,6 +34,7 @@
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -41,6 +45,7 @@
 import org.apache.helix.TestHelper;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
@@ -65,15 +70,17 @@
 import org.testng.annotations.Test;
 
 public class TestParticipantManager extends ZkTestBase {
-  private MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
-  private String clusterName = TestHelper.getTestClassName();
+  private final MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
+  private final String _clusterName = TestHelper.getTestClassName();
+  private final ExecutorService _executor = Executors.newFixedThreadPool(1);
+
   static {
     System.setProperty(SystemPropertyKeys.STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED, "true");
   }
 
   @AfterMethod
   public void afterMethod(Method testMethod, ITestContext testContext) {
-    deleteCluster(clusterName);
+    deleteCluster(_clusterName);
     super.endTest(testMethod, testContext);
   }
 
@@ -84,25 +91,24 @@
 
   @Test
   public void simpleIntegrationTest() throws Exception {
-    int n = 1;
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
         4, // partitions per resource
-        n, // number of nodes
+        1, // number of nodes
         1, // replicas
         "MasterSlave", true); // do rebalance
 
+    String instanceName = "localhost_12918";
     HelixManager participant =
-        new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
+        new ZKHelixManager(_clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
     participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
         new MockMSModelFactory());
     participant.connect();
 
     HelixManager controller =
-        new ZKHelixManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
+        new ZKHelixManager(_clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
     controller.connect();
 
     verifyHelixManagerMetrics(InstanceType.PARTICIPANT, MonitorLevel.DEFAULT,
@@ -111,27 +117,86 @@
         controller.getInstanceName());
 
     BestPossibleExternalViewVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
             .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
             .build();
     Assert.assertTrue(verifier.verifyByPolling());
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ParticipantHistory history = accessor.getProperty(keyBuilder.participantHistory(instanceName));
+    Assert.assertNotNull(history);
+    long historyModifiedTime = history.getRecord().getModifiedTime();
 
     // cleanup
     controller.disconnect();
     participant.disconnect();
 
     // verify all live-instances and leader nodes are gone
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+    Assert.assertTrue(
+        historyModifiedTime <
+            accessor.getProperty(keyBuilder.participantHistory(instanceName)).getRecord().getModifiedTime());
+  }
+
+  @Test(invocationCount = 5)
+  public void testParticipantHistoryWithInstanceDrop() throws Exception {
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        4, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    String instanceName = "localhost_12918";
+    HelixManager participant =
+        new ZKHelixManager(_clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
+    participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
+        new MockMSModelFactory());
+    participant.connect();
+
+    HelixManager controller =
+        new ZKHelixManager(_clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
+    controller.connect();
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+            .build();
+    Assert.assertTrue(verifier.verifyByPolling());
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ParticipantHistory history = accessor.getProperty(keyBuilder.participantHistory(instanceName));
+    Assert.assertNotNull(history);
+
+    Future instanceDrop = _executor.submit(() -> {
+      boolean succeed = false;
+      while (!succeed) {
+        try {
+          // simulate instance drop
+          succeed = _baseAccessor.remove(keyBuilder.instance(instanceName).toString(), AccessOption.PERSISTENT);
+        } catch (Exception e) {
+          try {
+            Thread.sleep(100);
+          } catch (Exception ex) { }
+        }
+      }
+    });
+    // cleanup
+    controller.disconnect();
+    participant.disconnect();
+    instanceDrop.get(1000, TimeUnit.MILLISECONDS);
+    // ensure the history node is never created after instance drop
+    Assert.assertNull(accessor.getProperty(keyBuilder.participantHistory(instanceName)));
   }
 
   @Test
   public void simpleIntegrationTestNeg() throws Exception {
 
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -141,19 +206,19 @@
         "MasterSlave", true); // do rebalance
 
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
-    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(_clusterName);
     clusterConfig.getRecord()
         .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(),
             new ArrayList<>());
     clusterConfig.setTopologyAwareEnabled(true);
     clusterConfig.setTopology("/Rack/Sub-Rack/Host/Instance");
     clusterConfig.setFaultZoneType("Host");
-    configAccessor.setClusterConfig(clusterName, clusterConfig);
+    configAccessor.setClusterConfig(_clusterName, clusterConfig);
 
 
     String instanceName = "localhost_12918";
     HelixManager participant =
-        new ZKHelixManager(clusterName, instanceName , InstanceType.PARTICIPANT, ZK_ADDR);
+        new ZKHelixManager(_clusterName, instanceName , InstanceType.PARTICIPANT, ZK_ADDR);
     participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
         new MockMSModelFactory());
     // We are expecting an IllegalArgumentException since the domain is not set.
@@ -167,16 +232,16 @@
 
     // verify there is no live-instances created
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+        new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
   }
 
   @Test // (dependsOnMethods = "simpleIntegrationTest")
   public void testMonitoringLevel() throws Exception {
     int n = 1;
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -189,7 +254,7 @@
     HelixManager participant;
     try {
       participant =
-          new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
+          new ZKHelixManager(_clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
     } finally {
       System.clearProperty(SystemPropertyKeys.MONITOR_LEVEL);
     }
@@ -209,15 +274,15 @@
       String instanceName) throws MalformedObjectNameException {
     // check HelixCallback Monitor
     Set<ObjectInstance> objs =
-        _server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, instanceName), null);
+        _server.queryMBeans(buildCallbackMonitorObjectName(type, _clusterName, instanceName), null);
     Assert.assertEquals(objs.size(), 19);
 
     // check HelixZkClient Monitors
     objs =
-        _server.queryMBeans(buildZkClientMonitorObjectName(type, clusterName, instanceName), null);
+        _server.queryMBeans(buildZkClientMonitorObjectName(type, _clusterName, instanceName), null);
     Assert.assertEquals(objs.size(), 1);
 
-    objs = _server.queryMBeans(buildZkClientPathMonitorObjectName(type, clusterName, instanceName),
+    objs = _server.queryMBeans(buildZkClientPathMonitorObjectName(type, _clusterName, instanceName),
         null);
 
     int expectedZkPathMonitor;
@@ -262,7 +327,7 @@
 
     MockParticipantManager[] participants = new MockParticipantManager[n];
 
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -273,18 +338,18 @@
 
     // start controller
     ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
       participants[i].syncStart();
     }
 
     BestPossibleExternalViewVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
             .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
             .build();
     Assert.assertTrue(verifier.verifyByPolling());
@@ -339,7 +404,7 @@
 
     MockParticipantManager[] participants = new MockParticipantManager[n];
 
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -350,13 +415,13 @@
 
     // start controller
     ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
       participants[i].setTransition(new SessionExpiryTransition(startCountdown, endCountdown));
       participants[i].syncStart();
     }
@@ -367,7 +432,7 @@
     ZkTestHelper.expireSession(participants[0].getZkClient());
 
     BestPossibleExternalViewVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
             .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
             .build();
     Assert.assertTrue(verifier.verifyByPolling());
@@ -376,7 +441,7 @@
     Assert.assertNotSame(newSessionId, oldSessionId);
 
     // assert interrupt exception error in old session
-    String errPath = PropertyPathBuilder.instanceError(clusterName, "localhost_12918", oldSessionId,
+    String errPath = PropertyPathBuilder.instanceError(_clusterName, "localhost_12918", oldSessionId,
         "TestDB0", "TestDB0_0");
     ZNRecord error = _gZkClient.readData(errPath);
     Assert.assertNotNull(error,