| package org.apache.helix.manager.zk; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.lang.management.ManagementFactory; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.I0Itec.zkclient.DataUpdater; |
| import org.I0Itec.zkclient.exception.ZkNodeExistsException; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.LiveInstanceInfoProvider; |
| import org.apache.helix.PreConnectCallback; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.ZNRecord; |
| import org.apache.helix.ZNRecordBucketizer; |
| import org.apache.helix.manager.zk.client.HelixZkClient; |
| import org.apache.helix.messaging.DefaultMessagingService; |
| import org.apache.helix.model.CurrentState; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.ParticipantHistory; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.model.builder.HelixConfigScopeBuilder; |
| import org.apache.helix.participant.StateMachineEngine; |
| import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Class to handle all session related work for a participant. |
| */ |
| public class ParticipantManager { |
| private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class); |
| |
| final HelixZkClient _zkclient; |
| final HelixManager _manager; |
| final PropertyKey.Builder _keyBuilder; |
| final String _clusterName; |
| final String _instanceName; |
| final int _sessionTimeout; |
| final ConfigAccessor _configAccessor; |
| final InstanceType _instanceType; |
| final HelixAdmin _helixAdmin; |
| final ZKHelixDataAccessor _dataAccessor; |
| final DefaultMessagingService _messagingService; |
| final StateMachineEngine _stateMachineEngine; |
| final LiveInstanceInfoProvider _liveInstanceInfoProvider; |
| final List<PreConnectCallback> _preConnectCallbacks; |
| |
| // zk session id should be immutable after participant manager is created. This is to avoid |
| // session race condition when handling new session for the participant. |
| private final String _sessionId; |
| |
| public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout, |
| LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks, |
| final String sessionId) { |
| _zkclient = zkclient; |
| _manager = manager; |
| _clusterName = manager.getClusterName(); |
| _instanceName = manager.getInstanceName(); |
| _keyBuilder = new PropertyKey.Builder(_clusterName); |
| _sessionId = sessionId; |
| _sessionTimeout = sessionTimeout; |
| _configAccessor = manager.getConfigAccessor(); |
| _instanceType = manager.getInstanceType(); |
| _helixAdmin = manager.getClusterManagmentTool(); |
| _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor(); |
| _messagingService = (DefaultMessagingService) manager.getMessagingService(); |
| _stateMachineEngine = manager.getStateMachineEngine(); |
| _liveInstanceInfoProvider = liveInstanceInfoProvider; |
| _preConnectCallbacks = preConnectCallbacks; |
| } |
| |
| /** |
| * Handles a new session for a participant. The new session's id is passed in when participant |
| * manager is created, as it is required to prevent ephemeral node creation from session race |
| * condition: ephemeral node is created by an expired or unexpected session. |
| * |
| * @throws Exception |
| */ |
| public void handleNewSession() throws Exception { |
| // Check zk session of this participant is still valid. |
| // If not, skip handling new session for this participant. |
| final String zkClientHexSession = ZKUtil.toHexSessionId(_zkclient.getSessionId()); |
| if (!zkClientHexSession.equals(_sessionId)) { |
| throw new HelixException( |
| "Failed to handle new session for participant. There is a session mismatch: " |
| + "participant manager session = " + _sessionId + ", zk client session = " |
| + zkClientHexSession); |
| } |
| |
| joinCluster(); |
| |
| /** |
| * Invoke PreConnectCallbacks |
| */ |
| for (PreConnectCallback callback : _preConnectCallbacks) { |
| callback.onPreConnect(); |
| } |
| |
| // TODO create live instance node after all the init works done --JJ |
| // This will help to prevent controller from sending any message prematurely. |
| // Live instance creation also checks if the expected session is valid or not. Live instance |
| // should not be created by an expired zk session. |
| createLiveInstance(); |
| carryOverPreviousCurrentState(); |
| |
| /** |
| * setup message listener |
| */ |
| setupMsgHandler(); |
| } |
| |
| private void joinCluster() { |
| // Read cluster config and see if instance can auto join the cluster |
| boolean autoJoin = false; |
| try { |
| HelixConfigScope scope = |
| new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster( |
| _manager.getClusterName()).build(); |
| autoJoin = |
| Boolean.parseBoolean(_configAccessor.get(scope, |
| ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)); |
| LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin); |
| } catch (Exception e) { |
| // autoJoin is false |
| } |
| |
| if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) { |
| if (!autoJoin) { |
| throw new HelixException("Initial cluster structure is not set up for instance: " |
| + _instanceName + ", instanceType: " + _instanceType); |
| } else { |
| LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName); |
| InstanceConfig instanceConfig = new InstanceConfig(_instanceName); |
| String hostName = _instanceName; |
| String port = ""; |
| int lastPos = _instanceName.lastIndexOf("_"); |
| if (lastPos > 0) { |
| hostName = _instanceName.substring(0, lastPos); |
| port = _instanceName.substring(lastPos + 1); |
| } |
| instanceConfig.setHostName(hostName); |
| instanceConfig.setPort(port); |
| instanceConfig.setInstanceEnabled(true); |
| _helixAdmin.addInstance(_clusterName, instanceConfig); |
| } |
| } |
| } |
| |
| private void createLiveInstance() { |
| String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath(); |
| LiveInstance liveInstance = new LiveInstance(_instanceName); |
| liveInstance.setSessionId(_sessionId); |
| liveInstance.setHelixVersion(_manager.getVersion()); |
| liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName()); |
| |
| // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider; |
| if (_liveInstanceInfoProvider != null) { |
| LOG.info("invoke liveInstanceInfoProvider"); |
| ZNRecord additionalLiveInstanceInfo = |
| _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo(); |
| if (additionalLiveInstanceInfo != null) { |
| additionalLiveInstanceInfo.merge(liveInstance.getRecord()); |
| ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName); |
| liveInstance = new LiveInstance(mergedLiveInstance); |
| LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance); |
| } |
| } |
| |
| boolean retry; |
| do { |
| retry = false; |
| try { |
| // Zk session ID will be validated in createEphemeral. |
| _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord(), _sessionId); |
| LOG.info("LiveInstance created, path: {}, sessionId: {}", liveInstancePath, |
| liveInstance.getEphemeralOwner()); |
| } catch (ZkSessionMismatchedException e) { |
| throw new HelixException( |
| "Failed to create live instance, path: " + liveInstancePath + ". Caused by: " |
| + e.getMessage()); |
| } catch (ZkNodeExistsException e) { |
| LOG.warn("Found another instance with same instance name: {} in cluster: {}", _instanceName, |
| _clusterName); |
| |
| Stat stat = new Stat(); |
| ZNRecord record = _zkclient.readData(liveInstancePath, stat, true); |
| if (record == null) { |
| /** |
| * live-instance is gone as we check it, retry create live-instance |
| */ |
| retry = true; |
| } else { |
| /** |
| * wait for a while, in case previous helix-participant exits unexpectedly |
| * and its live-instance still hangs around until session timeout |
| */ |
| try { |
| TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000); |
| } catch (InterruptedException ex) { |
| LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex); |
| } |
| /** |
| * give a last try after exit while loop |
| */ |
| retry = true; |
| break; |
| } |
| } |
| } while (retry); |
| |
| /** |
| * give a last shot |
| */ |
| if (retry) { |
| try { |
| // Zk session ID will be validated in createEphemeral. |
| _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord(), _sessionId); |
| LOG.info("LiveInstance created, path: {}, sessionId: {}", liveInstancePath, |
| liveInstance.getEphemeralOwner()); |
| } catch (ZkSessionMismatchedException e) { |
| throw new HelixException( |
| "Failed to create live instance, path: " + liveInstancePath + ". Caused by: " |
| + e.getMessage()); |
| } catch (ZkNodeExistsException e) { |
| throw new HelixException("Failed to create live instance because instance: " + _instanceName |
| + " already has a live-instance in cluster: " + _clusterName + ". Path is: " |
| + liveInstancePath); |
| } catch (Exception e) { |
| throw new HelixException("Failed to create live instance. " + e.getMessage()); |
| } |
| } |
| |
| ParticipantHistory history = getHistory(); |
| history.reportOnline(_sessionId, _manager.getVersion()); |
| persistHistory(history); |
| } |
| |
| /** |
| * carry over current-states from last sessions |
| * set to initial state for current session only when state doesn't exist in current session |
| */ |
| private void carryOverPreviousCurrentState() { |
| List<String> sessions = _dataAccessor.getChildNames(_keyBuilder.sessions(_instanceName)); |
| |
| for (String session : sessions) { |
| if (session.equals(_sessionId)) { |
| continue; |
| } |
| |
| List<CurrentState> lastCurStates = |
| _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session)); |
| |
| for (CurrentState lastCurState : lastCurStates) { |
| LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId() |
| + " to current session: " + _sessionId); |
| String stateModelDefRef = lastCurState.getStateModelDefRef(); |
| if (stateModelDefRef == null) { |
| LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: " |
| + lastCurState); |
| continue; |
| } |
| StateModelDefinition stateModel = |
| _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef)); |
| |
| BaseDataAccessor<ZNRecord> baseAccessor = _dataAccessor.getBaseDataAccessor(); |
| String curStatePath = |
| _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()) |
| .getPath(); |
| |
| String initState = stateModel.getInitialState(); |
| if (lastCurState.getBucketSize() > 0) { |
| // update parent node |
| ZNRecord metaRecord = new ZNRecord(lastCurState.getId()); |
| metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields()); |
| DataUpdater<ZNRecord> metaRecordUpdater = |
| new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(metaRecord)); |
| boolean success = |
| baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT); |
| if (success) { |
| // update current state buckets |
| ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(lastCurState.getBucketSize()); |
| |
| Map<String, ZNRecord> map = bucketizer.bucketize(lastCurState.getRecord()); |
| List<String> paths = new ArrayList<String>(); |
| List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>(); |
| for (String bucketName : map.keySet()) { |
| paths.add(curStatePath + "/" + bucketName); |
| updaters.add(new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(map |
| .get(bucketName)))); |
| } |
| |
| baseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); |
| } |
| |
| } else { |
| _dataAccessor.getBaseDataAccessor().update(curStatePath, |
| new CurStateCarryOverUpdater(_sessionId, initState, lastCurState), |
| AccessOption.PERSISTENT); |
| } |
| } |
| } |
| |
| /** |
| * remove previous current state parent nodes |
| */ |
| for (String session : sessions) { |
| if (session.equals(_sessionId)) { |
| continue; |
| } |
| |
| String path = _keyBuilder.currentStates(_instanceName, session).getPath(); |
| LOG.info("Removing current states from previous sessions. path: " + path); |
| _zkclient.deleteRecursively(path); |
| } |
| } |
| |
| private void setupMsgHandler() throws Exception { |
| _messagingService.registerMessageHandlerFactory(_stateMachineEngine.getMessageTypes(), |
| _stateMachineEngine); |
| _manager.addMessageListener(_messagingService.getExecutor(), _instanceName); |
| |
| ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor()); |
| _stateMachineEngine.registerStateModelFactory( |
| DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory); |
| _messagingService.onConnected(); |
| } |
| |
| private ParticipantHistory getHistory() { |
| PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName); |
| ParticipantHistory history = _dataAccessor.getProperty(propertyKey); |
| if (history == null) { |
| history = new ParticipantHistory(_instanceName); |
| } |
| return history; |
| } |
| |
| private void persistHistory(ParticipantHistory history) { |
| PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName); |
| if (!_dataAccessor.setProperty(propertyKey, history)) { |
| LOG.error("Failed to persist participant history to zk!"); |
| } |
| } |
| |
| public void reset() { |
| } |
| |
| public void disconnect() { |
| try { |
| ParticipantHistory history = getHistory(); |
| history.reportOffline(); |
| persistHistory(history); |
| } catch (Exception e) { |
| LOG.error("Failed to report participant offline.", e); |
| } |
| reset(); |
| } |
| } |