Fix race condition between instance drop and participant history update (#2073)
Fix race condition between instance drop and participant history update
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 70acf31..f81f4b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -82,8 +82,8 @@
.asList(HelixConstants.ChangeType.EXTERNAL_VIEW,
HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW);
- private String _clusterName = AbstractDataCache.UNKNOWN_CLUSTER;
- private String _pipelineName = AbstractDataCache.UNKNOWN_PIPELINE;
+ private final String _clusterName;
+ private final String _pipelineName;
private String _clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
private ClusterConfig _clusterConfig;
@@ -106,17 +106,17 @@
private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;
// Special caches
- private CurrentStateCache _currentStateCache;
+ private final CurrentStateCache _currentStateCache;
protected TaskCurrentStateCache _taskCurrentStateCache;
- private InstanceMessagesCache _instanceMessagesCache;
+ private final InstanceMessagesCache _instanceMessagesCache;
// Other miscellaneous caches
private Map<String, Long> _instanceOfflineTimeMap;
private Map<String, Map<String, String>> _idealStateRuleMap;
- private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
- private Set<String> _disabledInstanceSet = new HashSet<>();
+ private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
+ private final Set<String> _disabledInstanceSet = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
- private Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
+ private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
public BaseControllerDataProvider() {
@@ -777,12 +777,15 @@
PropertyKey propertyKey = keyBuilder.participantHistory(instance);
ParticipantHistory history = accessor.getProperty(propertyKey);
if (history == null) {
- history = new ParticipantHistory(instance);
+ // this means the instance has been removed, skip history update
+ continue;
}
if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) {
history.reportOffline();
- // persist history back to ZK.
- if (!accessor.setProperty(propertyKey, history)) {
+ // persist history back to ZK only if the node still exists
+ boolean succeed = accessor.updateProperty(propertyKey,
+ currentData -> (currentData == null) ? null : history.getRecord(), history);
+ if (!succeed) {
LogUtil.logError(logger, getClusterEventId(),
"Fails to persist participant online history back to ZK!");
}
@@ -822,6 +825,10 @@
long timeOutWindow) {
ParticipantHistory history =
accessor.getProperty(accessor.keyBuilder().participantHistory(instance));
+ if (history == null) {
+ LogUtil.logWarn(logger, getClusterEventId(), "Participant history is null for instance " + instance);
+ return false;
+ }
List<Long> onlineTimestamps = history.getOnlineTimestampsAsMilliseconds();
List<Long> offlineTimestamps = history.getOfflineTimestampsAsMilliseconds();
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 09d994f..741e5c4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -351,7 +351,7 @@
ParticipantHistory history = getHistory();
history.reportOnline(_sessionId, _manager.getVersion());
- persistHistory(history);
+ persistHistory(history, false);
}
/**
@@ -494,15 +494,16 @@
private ParticipantHistory getHistory() {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
- if (history == null) {
- history = new ParticipantHistory(_instanceName);
- }
- return history;
+ return history == null ? new ParticipantHistory(_instanceName) : history;
}
- private void persistHistory(ParticipantHistory history) {
+ private void persistHistory(ParticipantHistory history, boolean skipOnEmptyPath) {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
- if (!_dataAccessor.setProperty(propertyKey, history)) {
+ boolean result = skipOnEmptyPath
+ ? _dataAccessor.updateProperty(
+ propertyKey, currentData -> (currentData == null) ? null : history.getRecord(), history)
+ : _dataAccessor.setProperty(propertyKey, history);
+ if (!result) {
LOG.error("Failed to persist participant history to zk!");
}
}
@@ -514,7 +515,7 @@
try {
ParticipantHistory history = getHistory();
history.reportOffline();
- persistHistory(history);
+ persistHistory(history, true);
} catch (Exception e) {
LOG.error("Failed to report participant offline.", e);
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 28a9e4b..673536a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -205,6 +205,10 @@
_zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true);
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId));
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 6485801..e15947b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -25,8 +25,6 @@
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -39,7 +37,7 @@
import org.slf4j.LoggerFactory;
public class MockParticipantManager extends ClusterManager {
- private static Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);
protected int _transDelay = 10;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 050bc76..95d1f45 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -24,6 +24,9 @@
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
@@ -31,6 +34,7 @@
import javax.management.ObjectInstance;
import javax.management.ObjectName;
+import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -41,6 +45,7 @@
import org.apache.helix.TestHelper;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.common.ZkTestBase;
@@ -65,15 +70,17 @@
import org.testng.annotations.Test;
public class TestParticipantManager extends ZkTestBase {
- private MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
- private String clusterName = TestHelper.getTestClassName();
+ private final MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
+ private final String _clusterName = TestHelper.getTestClassName();
+ private final ExecutorService _executor = Executors.newFixedThreadPool(1);
+
static {
System.setProperty(SystemPropertyKeys.STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED, "true");
}
@AfterMethod
public void afterMethod(Method testMethod, ITestContext testContext) {
- deleteCluster(clusterName);
+ deleteCluster(_clusterName);
super.endTest(testMethod, testContext);
}
@@ -84,25 +91,24 @@
@Test
public void simpleIntegrationTest() throws Exception {
- int n = 1;
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
4, // partitions per resource
- n, // number of nodes
+ 1, // number of nodes
1, // replicas
"MasterSlave", true); // do rebalance
+ String instanceName = "localhost_12918";
HelixManager participant =
- new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
+ new ZKHelixManager(_clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
new MockMSModelFactory());
participant.connect();
HelixManager controller =
- new ZKHelixManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
+ new ZKHelixManager(_clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
controller.connect();
verifyHelixManagerMetrics(InstanceType.PARTICIPANT, MonitorLevel.DEFAULT,
@@ -111,27 +117,86 @@
controller.getInstanceName());
BestPossibleExternalViewVerifier verifier =
- new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+ new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verifyByPolling());
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ParticipantHistory history = accessor.getProperty(keyBuilder.participantHistory(instanceName));
+ Assert.assertNotNull(history);
+ long historyModifiedTime = history.getRecord().getModifiedTime();
// cleanup
controller.disconnect();
participant.disconnect();
// verify all live-instances and leader nodes are gone
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+ Assert.assertTrue(
+ historyModifiedTime <
+ accessor.getProperty(keyBuilder.participantHistory(instanceName)).getRecord().getModifiedTime());
+ }
+
+ @Test(invocationCount = 5)
+ public void testParticipantHistoryWithInstanceDrop() throws Exception {
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 4, // partitions per resource
+ 1, // number of nodes
+ 1, // replicas
+ "MasterSlave", true); // do rebalance
+
+ String instanceName = "localhost_12918";
+ HelixManager participant =
+ new ZKHelixManager(_clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
+ participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
+ new MockMSModelFactory());
+ participant.connect();
+
+ HelixManager controller =
+ new ZKHelixManager(_clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
+ controller.connect();
+ BestPossibleExternalViewVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+ Assert.assertTrue(verifier.verifyByPolling());
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ParticipantHistory history = accessor.getProperty(keyBuilder.participantHistory(instanceName));
+ Assert.assertNotNull(history);
+
+ Future instanceDrop = _executor.submit(() -> {
+ boolean succeed = false;
+ while (!succeed) {
+ try {
+ // simulate instance drop
+ succeed = _baseAccessor.remove(keyBuilder.instance(instanceName).toString(), AccessOption.PERSISTENT);
+ } catch (Exception e) {
+ try {
+ Thread.sleep(100);
+ } catch (Exception ex) { }
+ }
+ }
+ });
+ // cleanup
+ controller.disconnect();
+ participant.disconnect();
+ instanceDrop.get(1000, TimeUnit.MILLISECONDS);
+ // ensure the history node is never created after instance drop
+ Assert.assertNull(accessor.getProperty(keyBuilder.participantHistory(instanceName)));
}
@Test
public void simpleIntegrationTestNeg() throws Exception {
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
@@ -141,19 +206,19 @@
"MasterSlave", true); // do rebalance
ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
- ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(_clusterName);
clusterConfig.getRecord()
.setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(),
new ArrayList<>());
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/Rack/Sub-Rack/Host/Instance");
clusterConfig.setFaultZoneType("Host");
- configAccessor.setClusterConfig(clusterName, clusterConfig);
+ configAccessor.setClusterConfig(_clusterName, clusterConfig);
String instanceName = "localhost_12918";
HelixManager participant =
- new ZKHelixManager(clusterName, instanceName , InstanceType.PARTICIPANT, ZK_ADDR);
+ new ZKHelixManager(_clusterName, instanceName , InstanceType.PARTICIPANT, ZK_ADDR);
participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
new MockMSModelFactory());
// We are expecting an IllegalArgumentException since the domain is not set.
@@ -167,16 +232,16 @@
// verify there is no live-instances created
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
}
@Test // (dependsOnMethods = "simpleIntegrationTest")
public void testMonitoringLevel() throws Exception {
int n = 1;
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
@@ -189,7 +254,7 @@
HelixManager participant;
try {
participant =
- new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
+ new ZKHelixManager(_clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
} finally {
System.clearProperty(SystemPropertyKeys.MONITOR_LEVEL);
}
@@ -209,15 +274,15 @@
String instanceName) throws MalformedObjectNameException {
// check HelixCallback Monitor
Set<ObjectInstance> objs =
- _server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, instanceName), null);
+ _server.queryMBeans(buildCallbackMonitorObjectName(type, _clusterName, instanceName), null);
Assert.assertEquals(objs.size(), 19);
// check HelixZkClient Monitors
objs =
- _server.queryMBeans(buildZkClientMonitorObjectName(type, clusterName, instanceName), null);
+ _server.queryMBeans(buildZkClientMonitorObjectName(type, _clusterName, instanceName), null);
Assert.assertEquals(objs.size(), 1);
- objs = _server.queryMBeans(buildZkClientPathMonitorObjectName(type, clusterName, instanceName),
+ objs = _server.queryMBeans(buildZkClientPathMonitorObjectName(type, _clusterName, instanceName),
null);
int expectedZkPathMonitor;
@@ -262,7 +327,7 @@
MockParticipantManager[] participants = new MockParticipantManager[n];
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
@@ -273,18 +338,18 @@
// start controller
ClusterControllerManager controller =
- new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
participants[i].syncStart();
}
BestPossibleExternalViewVerifier verifier =
- new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+ new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verifyByPolling());
@@ -339,7 +404,7 @@
MockParticipantManager[] participants = new MockParticipantManager[n];
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
@@ -350,13 +415,13 @@
// start controller
ClusterControllerManager controller =
- new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
// start participants
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
participants[i].setTransition(new SessionExpiryTransition(startCountdown, endCountdown));
participants[i].syncStart();
}
@@ -367,7 +432,7 @@
ZkTestHelper.expireSession(participants[0].getZkClient());
BestPossibleExternalViewVerifier verifier =
- new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+ new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verifyByPolling());
@@ -376,7 +441,7 @@
Assert.assertNotSame(newSessionId, oldSessionId);
// assert interrupt exception error in old session
- String errPath = PropertyPathBuilder.instanceError(clusterName, "localhost_12918", oldSessionId,
+ String errPath = PropertyPathBuilder.instanceError(_clusterName, "localhost_12918", oldSessionId,
"TestDB0", "TestDB0_0");
ZNRecord error = _gZkClient.readData(errPath);
Assert.assertNotNull(error,