blob: b7a001424bfa8fc3f9dbc991a3bc6a7d68a91441 [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.api.cloud.CloudInstanceInformationProcessor;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to handle all session related work for a participant.
*/
public class ParticipantManager {
private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud.";
final RealmAwareZkClient _zkclient;
final HelixManager _manager;
final PropertyKey.Builder _keyBuilder;
final String _clusterName;
final String _instanceName;
final int _sessionTimeout;
final ConfigAccessor _configAccessor;
final InstanceType _instanceType;
final HelixAdmin _helixAdmin;
final ZKHelixDataAccessor _dataAccessor;
final DefaultMessagingService _messagingService;
final StateMachineEngine _stateMachineEngine;
final LiveInstanceInfoProvider _liveInstanceInfoProvider;
final List<PreConnectCallback> _preConnectCallbacks;
final HelixManagerProperty _helixManagerProperty;
// zk session id should be immutable after participant manager is created. This is to avoid
// session race condition when handling new session for the participant.
private final String _sessionId;
@Deprecated
public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks,
final String sessionId) {
this(manager, zkclient, sessionTimeout, liveInstanceInfoProvider, preConnectCallbacks,
sessionId, null);
}
public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
LiveInstanceInfoProvider liveInstanceInfoProvider,
List<PreConnectCallback> preConnectCallbacks, final String sessionId,
HelixManagerProperty helixManagerProperty) {
_zkclient = zkclient;
_manager = manager;
_clusterName = manager.getClusterName();
_instanceName = manager.getInstanceName();
_keyBuilder = new PropertyKey.Builder(_clusterName);
_sessionId = sessionId;
_sessionTimeout = sessionTimeout;
_configAccessor = manager.getConfigAccessor();
_instanceType = manager.getInstanceType();
_helixAdmin = manager.getClusterManagmentTool();
_dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
_messagingService = (DefaultMessagingService) manager.getMessagingService();
_stateMachineEngine = manager.getStateMachineEngine();
_liveInstanceInfoProvider = liveInstanceInfoProvider;
_preConnectCallbacks = preConnectCallbacks;
_helixManagerProperty = helixManagerProperty;
}
/**
* Handles a new session for a participant. The new session's id is passed in when participant
* manager is created, as it is required to prevent ephemeral node creation from session race
* condition: ephemeral node is created by an expired or unexpected session.
* @throws Exception
*/
public void handleNewSession() throws Exception {
// Check zk session of this participant is still valid.
// If not, skip handling new session for this participant.
final String zkClientHexSession = ZKUtil.toHexSessionId(_zkclient.getSessionId());
if (!zkClientHexSession.equals(_sessionId)) {
throw new HelixException(
"Failed to handle new session for participant. There is a session mismatch: "
+ "participant manager session = " + _sessionId + ", zk client session = "
+ zkClientHexSession);
}
joinCluster();
/**
* Invoke PreConnectCallbacks
*/
for (PreConnectCallback callback : _preConnectCallbacks) {
callback.onPreConnect();
}
// TODO create live instance node after all the init works done --JJ
// This will help to prevent controller from sending any message prematurely.
// Live instance creation also checks if the expected session is valid or not. Live instance
// should not be created by an expired zk session.
createLiveInstance();
if (shouldCarryOver()) {
carryOverPreviousCurrentState(_dataAccessor, _instanceName, _sessionId,
_manager.getStateMachineEngine(), true);
}
removePreviousTaskCurrentStates();
/**
* setup message listener
*/
setupMsgHandler();
}
private boolean shouldCarryOver() {
if (_liveInstanceInfoProvider == null
|| _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo() == null) {
return true;
}
String status = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo()
.getSimpleField(LiveInstance.LiveInstanceProperty.STATUS.name());
// If frozen, no carry-over
return !LiveInstance.LiveInstanceStatus.PAUSED.name().equals(status);
}
private void joinCluster() {
// Read cluster config and see if an instance can auto join or auto register to the cluster
boolean autoJoin = false;
boolean autoRegistration = false;
// Read "allowParticipantAutoJoin" field to see if an instance can auto join to the cluster
try {
HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER)
.forCluster(_manager.getClusterName()).build();
autoJoin = Boolean
.parseBoolean(_configAccessor.get(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
} catch (Exception e) {
LOG.info("auto join is false for cluster" + _clusterName);
}
// Read cloud config and see if an instance can auto register to the cluster
// Difference between auto join and auto registration is that the latter will also populate the
// domain information in instance config
try {
autoRegistration =
Boolean.valueOf(_helixManagerProperty.getHelixCloudProperty().getCloudEnabled());
LOG.info("instance: " + _instanceName + " auto-registering " + _clusterName + " is "
+ autoRegistration);
} catch (Exception e) {
LOG.info("auto registration is false for cluster" + _clusterName);
}
InstanceConfig instanceConfig;
if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
if (!autoJoin) {
throw new HelixException("Initial cluster structure is not set up for instance: "
+ _instanceName + ", instanceType: " + _instanceType);
}
if (!autoRegistration) {
LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
} else {
LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
String domain = cloudInstanceInformation
.get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()) + _instanceName;
instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
instanceConfig.setDomain(domain);
}
instanceConfig
.validateTopologySettingInInstanceConfig(_configAccessor.getClusterConfig(_clusterName),
_instanceName);
_helixAdmin.addInstance(_clusterName, instanceConfig);
} else {
_configAccessor.getInstanceConfig(_clusterName, _instanceName)
.validateTopologySettingInInstanceConfig(_configAccessor.getClusterConfig(_clusterName),
_instanceName);
}
}
private CloudInstanceInformation getCloudInstanceInformation() {
String cloudInstanceInformationProcessorName =
_helixManagerProperty.getHelixCloudProperty().getCloudInfoProcessorName();
try {
// fetch cloud instance information for the instance
String cloudInstanceInformationProcessorClassName = CLOUD_PROCESSOR_PATH_PREFIX
+ _helixManagerProperty.getHelixCloudProperty().getCloudProvider().toLowerCase() + "."
+ cloudInstanceInformationProcessorName;
Class processorClass = Class.forName(cloudInstanceInformationProcessorClassName);
Constructor constructor = processorClass.getConstructor(HelixCloudProperty.class);
CloudInstanceInformationProcessor processor = (CloudInstanceInformationProcessor) constructor
.newInstance(_helixManagerProperty.getHelixCloudProperty());
List<String> responses = processor.fetchCloudInstanceInformation();
// parse cloud instance information for the participant
CloudInstanceInformation cloudInstanceInformation =
processor.parseCloudInstanceInformation(responses);
return cloudInstanceInformation;
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
| IllegalAccessException | InvocationTargetException ex) {
throw new HelixException(
"Failed to create a new instance for the class: " + cloudInstanceInformationProcessorName,
ex);
}
}
private void createLiveInstance() {
String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
LiveInstance liveInstance = new LiveInstance(_instanceName);
liveInstance.setSessionId(_sessionId);
liveInstance.setHelixVersion(_manager.getVersion());
liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
liveInstance.setCurrentTaskThreadPoolSize(
TaskUtil.getTargetThreadPoolSize(_zkclient, _clusterName, _instanceName));
// LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
if (_liveInstanceInfoProvider != null) {
LOG.info("invoke liveInstanceInfoProvider");
ZNRecord additionalLiveInstanceInfo =
_liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
if (additionalLiveInstanceInfo != null) {
additionalLiveInstanceInfo.merge(liveInstance.getRecord());
ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
liveInstance = new LiveInstance(mergedLiveInstance);
LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
}
}
boolean retry;
do {
retry = false;
try {
// Zk session ID will be validated in createEphemeral.
_zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord(), _sessionId);
LOG.info("LiveInstance created, path: {}, sessionId: {}", liveInstancePath,
liveInstance.getEphemeralOwner());
} catch (ZkSessionMismatchedException e) {
throw new HelixException(
"Failed to create live instance, path: " + liveInstancePath + ". Caused by: "
+ e.getMessage());
} catch (ZkNodeExistsException e) {
LOG.warn("Found another instance with same instance name: {} in cluster: {}", _instanceName,
_clusterName);
Stat stat = new Stat();
ZNRecord record = _zkclient.readData(liveInstancePath, stat, true);
if (record == null) {
/**
* live-instance is gone as we check it, retry create live-instance
*/
retry = true;
} else {
/**
* wait for a while, in case previous helix-participant exits unexpectedly
* and its live-instance still hangs around until session timeout
*/
try {
TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
} catch (InterruptedException ex) {
LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex);
}
/**
* give a last try after exit while loop
*/
retry = true;
break;
}
}
} while (retry);
/**
* give a last shot
*/
if (retry) {
try {
// Zk session ID will be validated in createEphemeral.
_zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord(), _sessionId);
LOG.info("LiveInstance created, path: {}, sessionId: {}", liveInstancePath,
liveInstance.getEphemeralOwner());
} catch (ZkSessionMismatchedException e) {
throw new HelixException(
"Failed to create live instance, path: " + liveInstancePath + ". Caused by: "
+ e.getMessage());
} catch (ZkNodeExistsException e) {
throw new HelixException("Failed to create live instance because instance: " + _instanceName
+ " already has a live-instance in cluster: " + _clusterName + ". Path is: "
+ liveInstancePath);
} catch (Exception e) {
throw new HelixException("Failed to create live instance. " + e.getMessage());
}
}
ParticipantHistory history = getHistory();
history.reportOnline(_sessionId, _manager.getVersion());
persistHistory(history);
}
/**
* carry over current-states from last sessions
* set to initial state for current session only when state doesn't exist in current session
*/
public static synchronized void carryOverPreviousCurrentState(HelixDataAccessor dataAccessor,
String instanceName, String sessionId, StateMachineEngine stateMachineEngine,
boolean setToInitState) {
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
List<String> sessions = dataAccessor.getChildNames(keyBuilder.sessions(instanceName));
for (String session : sessions) {
if (session.equals(sessionId)) {
continue;
}
// Ignore if any current states in the previous folder cannot be read.
List<CurrentState> lastCurStates =
dataAccessor.getChildValues(keyBuilder.currentStates(instanceName, session), false);
for (CurrentState lastCurState : lastCurStates) {
LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
+ " to current session: " + sessionId + ", setToInitState: " + setToInitState);
String stateModelDefRef = lastCurState.getStateModelDefRef();
if (stateModelDefRef == null) {
LOG.error(
"skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
+ lastCurState);
continue;
}
// If the the current state is related to tasks, there is no need to carry it over to new session.
// Note: this check is not necessary due to TaskCurrentStates, but keep it for backwards compatibility
if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
continue;
}
StateModelDefinition stateModelDef =
dataAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
String initState = stateModelDef.getInitialState();
Map<String, String> partitionExpectedStateMap = new HashMap<>();
if (setToInitState) {
lastCurState.getPartitionStateMap().keySet()
.forEach(partition -> partitionExpectedStateMap.put(partition, initState));
} else {
String factoryName = lastCurState.getStateModelFactoryName();
StateModelFactory<? extends StateModel> stateModelFactory =
stateMachineEngine.getStateModelFactory(stateModelDefRef, factoryName);
lastCurState.getPartitionStateMap().keySet().forEach(partition -> {
StateModel stateModel =
stateModelFactory.getStateModel(lastCurState.getResourceName(), partition);
if (stateModel != null) {
partitionExpectedStateMap.put(partition, stateModel.getCurrentState());
}
});
}
BaseDataAccessor<ZNRecord> baseAccessor = dataAccessor.getBaseDataAccessor();
String curStatePath =
keyBuilder.currentState(instanceName, sessionId, lastCurState.getResourceName())
.getPath();
if (lastCurState.getBucketSize() > 0) {
// update parent node
ZNRecord metaRecord = new ZNRecord(lastCurState.getId());
metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields());
DataUpdater<ZNRecord> metaRecordUpdater =
new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(metaRecord));
boolean success =
baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT);
if (success) {
// update current state buckets
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(lastCurState.getBucketSize());
Map<String, ZNRecord> map = bucketizer.bucketize(lastCurState.getRecord());
List<String> paths = new ArrayList<String>();
List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
for (String bucketName : map.keySet()) {
paths.add(curStatePath + "/" + bucketName);
updaters.add(new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(map
.get(bucketName))));
}
baseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
}
} else {
dataAccessor.getBaseDataAccessor().update(curStatePath,
new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, lastCurState),
AccessOption.PERSISTENT);
}
}
}
/**
* remove previous current state parent nodes
*/
for (String session : sessions) {
if (session.equals(sessionId)) {
continue;
}
PropertyKey currentStatesProperty = keyBuilder.currentStates(instanceName, session);
String path = currentStatesProperty.getPath();
LOG.info("Removing current states from previous sessions. path: {}", path);
if (!dataAccessor.removeProperty(currentStatesProperty)) {
throw new ZkClientException("Failed to delete " + path);
}
}
}
/**
* Remove all previous task current state sessions
*/
private void removePreviousTaskCurrentStates() {
for (String session : _dataAccessor
.getChildNames(_keyBuilder.taskCurrentStateSessions(_instanceName))) {
if (session.equals(_sessionId)) {
continue;
}
String path = _keyBuilder.taskCurrentStates(_instanceName, session).getPath();
LOG.info("Removing task current states from previous sessions. path: " + path);
_zkclient.deleteRecursively(path);
}
}
private void setupMsgHandler() throws Exception {
_messagingService.registerMessageHandlerFactory(_stateMachineEngine.getMessageTypes(),
_stateMachineEngine);
_manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
_stateMachineEngine.registerStateModelFactory(
DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
_messagingService.onConnected();
}
private ParticipantHistory getHistory() {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
if (history == null) {
history = new ParticipantHistory(_instanceName);
}
return history;
}
private void persistHistory(ParticipantHistory history) {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
if (!_dataAccessor.setProperty(propertyKey, history)) {
LOG.error("Failed to persist participant history to zk!");
}
}
public void reset() {
}
public void disconnect() {
try {
ParticipantHistory history = getHistory();
history.reportOffline();
persistHistory(history);
} catch (Exception e) {
LOG.error("Failed to report participant offline.", e);
}
reset();
}
}