| package org.apache.helix.participant; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.Consumer; |
| |
| import com.google.common.collect.ImmutableList; |
| import org.apache.helix.HelixConstants; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.NotificationContext; |
| import org.apache.helix.NotificationContext.MapKey; |
| import org.apache.helix.PropertyKey.Builder; |
| import org.apache.helix.messaging.handling.BatchMessageHandler; |
| import org.apache.helix.messaging.handling.BatchMessageWrapper; |
| import org.apache.helix.messaging.handling.HelixStateTransitionCancellationHandler; |
| import org.apache.helix.messaging.handling.HelixStateTransitionHandler; |
| import org.apache.helix.messaging.handling.MessageHandler; |
| import org.apache.helix.messaging.handling.TaskExecutor; |
| import org.apache.helix.model.CurrentState; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.Message.MessageType; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.participant.statemachine.StateModel; |
| import org.apache.helix.participant.statemachine.StateModelFactory; |
| import org.apache.helix.participant.statemachine.StateModelParser; |
| import org.apache.helix.task.TaskConstants; |
| import org.apache.helix.task.TaskPartitionState; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class HelixStateMachineEngine implements StateMachineEngine { |
| private static Logger logger = LoggerFactory.getLogger(HelixStateMachineEngine.class); |
| |
| // StateModelName->FactoryName->StateModelFactory |
| private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap; |
| private final StateModelParser _stateModelParser; |
| private final HelixManager _manager; |
| private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs; |
| |
| public HelixStateMachineEngine(HelixManager manager) { |
| _stateModelParser = new StateModelParser(); |
| _manager = manager; |
| |
| _stateModelFactoryMap = |
| new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>(); |
| _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>(); |
| } |
| |
| @Override |
| public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName) { |
| return getStateModelFactory(stateModelName, HelixConstants.DEFAULT_STATE_MODEL_FACTORY); |
| } |
| |
| @Override |
| public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName, |
| String factoryName) { |
| if (!_stateModelFactoryMap.containsKey(stateModelName)) { |
| return null; |
| } |
| return _stateModelFactoryMap.get(stateModelName).get(factoryName); |
| } |
| |
| @Override |
| public boolean registerStateModelFactory(String stateModelDef, |
| StateModelFactory<? extends StateModel> factory) { |
| return registerStateModelFactory(stateModelDef, factory, |
| HelixConstants.DEFAULT_STATE_MODEL_FACTORY); |
| } |
| |
| @Override |
| public boolean registerStateModelFactory(String stateModelName, |
| StateModelFactory<? extends StateModel> factory, String factoryName) { |
| if (stateModelName == null || factory == null || factoryName == null) { |
| throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null"); |
| } |
| |
| logger.info("Register state model factory for state model " + stateModelName |
| + " using factory name " + factoryName + " with " + factory); |
| |
| if (!_stateModelFactoryMap.containsKey(stateModelName)) { |
| _stateModelFactoryMap.put(stateModelName, |
| new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>()); |
| } |
| |
| if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName)) { |
| logger.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName |
| + " has already been registered."); |
| return false; |
| } |
| |
| _stateModelFactoryMap.get(stateModelName).put(factoryName, factory); |
| sendNopMessage(); |
| return true; |
| } |
| |
| // TODO: duplicated code in DefaultMessagingService |
| private void sendNopMessage() { |
| if (_manager.isConnected()) { |
| try { |
| Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString()); |
| nopMsg.setSrcName(_manager.getInstanceName()); |
| |
| HelixDataAccessor accessor = _manager.getHelixDataAccessor(); |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| if (_manager.getInstanceType() == InstanceType.CONTROLLER |
| || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) { |
| nopMsg.setTgtName(InstanceType.CONTROLLER.name()); |
| accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg); |
| } |
| |
| if (_manager.getInstanceType() == InstanceType.PARTICIPANT |
| || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) { |
| nopMsg.setTgtName(_manager.getInstanceName()); |
| accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg); |
| } |
| logger.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: " + nopMsg.getId()); |
| } catch (Exception e) { |
| logger.error(e.toString()); |
| } |
| } |
| } |
| |
| @Override |
| public void reset() { |
| logger.info("Resetting HelixStateMachineEngine"); |
| loopStateModelFactories(stateModel -> { |
| stateModel.reset(); |
| String initialState = _stateModelParser.getInitialState(stateModel.getClass()); |
| stateModel.updateState(initialState); |
| }); |
| logger.info("Successfully reset HelixStateMachineEngine"); |
| } |
| |
| @Override |
| public void sync() { |
| logger.info("Syncing HelixStateMachineEngine"); |
| loopStateModelFactories(StateModel::syncState); |
| logger.info("Successfully synced HelixStateMachineEngine"); |
| } |
| |
| private void loopStateModelFactories(Consumer<StateModel> consumer) { |
| for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap |
| .values()) { |
| for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) { |
| for (String resourceName : stateModelFactory.getResourceSet()) { |
| for (String partitionKey : stateModelFactory.getPartitionSet(resourceName)) { |
| logger.info("Operating on {}::{}", resourceName, partitionKey); |
| StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey); |
| if (stateModel != null) { |
| consumer.accept(stateModel); |
| // TODO probably should update the state on ZK. Shi confirm what needs |
| // to be done here. |
| } else { |
| // TODO: If stateModel is null, we might need to do something here |
| // This reset() is not synchronized. We observed that during a shutdown (where |
| // resources |
| // are all dropped), an NPE could be possible due to stateModel being null |
| // Two cases are possible: 1) removing a partition/resource 2) adding a |
| // partition/resource |
| // We may need to add more processing here to make sure things are being set to |
| // initialState. Otherwise, there might be inconsistencies that might cause partitions |
| // to be stuck in some state (because reset() would be a NOP here) |
| logger.warn("Failed operation due to StateModel being null! Resource: {}, Partition: {}", |
| resourceName, partitionKey); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public MessageHandler createHandler(Message message, NotificationContext context) { |
| String type = message.getMsgType(); |
| |
| if (!type.equals(MessageType.STATE_TRANSITION.name()) && !type |
| .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { |
| throw new HelixException("Expect state-transition message type, but was " |
| + message.getMsgType() + ", msgId: " + message.getMsgId()); |
| } |
| |
| String partitionKey = message.getPartitionName(); |
| String stateModelName = message.getStateModelDef(); |
| String resourceName = message.getResourceName(); |
| String sessionId = message.getTgtSessionId(); |
| int bucketSize = message.getBucketSize(); |
| |
| if (stateModelName == null) { |
| logger |
| .error("Fail to create msg-handler because message does not contain stateModelDef. msgId: " |
| + message.getId()); |
| return null; |
| } |
| |
| String factoryName = message.getStateModelFactoryName(); |
| if (factoryName == null) { |
| factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY; |
| } |
| |
| StateModelFactory<? extends StateModel> stateModelFactory = |
| getStateModelFactory(stateModelName, factoryName); |
| if (stateModelFactory == null) { |
| logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: " |
| + stateModelName + " using factoryName: " + factoryName + " for resource: " |
| + resourceName); |
| return null; |
| } |
| |
| // check if the state model definition exists and cache it |
| if (!_stateModelDefs.containsKey(stateModelName)) { |
| HelixDataAccessor accessor = _manager.getHelixDataAccessor(); |
| Builder keyBuilder = accessor.keyBuilder(); |
| StateModelDefinition stateModelDef = |
| accessor.getProperty(keyBuilder.stateModelDef(stateModelName)); |
| if (stateModelDef == null) { |
| throw new HelixException("fail to create msg-handler because stateModelDef for " |
| + stateModelName + " does NOT exist"); |
| } |
| _stateModelDefs.put(stateModelName, stateModelDef); |
| } |
| |
| if (!message.getBatchMessageMode()) { |
| String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState(); |
| StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey); |
| if (stateModel == null) { |
| stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey); |
| if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME) |
| && message.getToState().equals(TaskPartitionState.DROPPED.name())) { |
| // If stateModel is null, that means there was a reboot of the Participant. Then the |
| // purpose of this first message must be to drop the task. We manually set the current |
| // state to be the same state of fromState (which Controller inferred from JobContext) to |
| // allow the Participant to successfully process this dropping transition |
| stateModel.updateState(message.getFromState()); |
| } else { |
| stateModel.updateState(initState); |
| } |
| } |
| if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { |
| return new HelixStateTransitionCancellationHandler(stateModel, message, context); |
| } else { |
| // create currentStateDelta for this partition |
| // TODO: move currentStateDelta to StateTransitionMsgHandler |
| CurrentState currentStateDelta = new CurrentState(resourceName); |
| currentStateDelta.setSessionId(sessionId); |
| currentStateDelta.setStateModelDefRef(stateModelName); |
| currentStateDelta.setStateModelFactoryName(factoryName); |
| currentStateDelta.setBucketSize(bucketSize); |
| |
| currentStateDelta.setState(partitionKey, |
| (stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState()); |
| |
| return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, |
| currentStateDelta); |
| } |
| } else { |
| BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName); |
| if (wrapper == null) { |
| wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName); |
| } |
| |
| // get executor-service for the message |
| TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString()); |
| if (executor == null) { |
| logger.error( |
| "fail to get executor-service for batch message: " + message.getId() + ". msgType: " |
| + message.getMsgType() + ", resource: " + message.getResourceName()); |
| return null; |
| } |
| return new BatchMessageHandler(message, context, this, wrapper, executor); |
| } |
| } |
| |
| @Override public List<String> getMessageTypes() { |
| return ImmutableList |
| .of(MessageType.STATE_TRANSITION.name(), MessageType.STATE_TRANSITION_CANCELLATION.name()); |
| } |
| |
| @Override |
| public boolean removeStateModelFactory(String stateModelDef, |
| StateModelFactory<? extends StateModel> factory) { |
| throw new UnsupportedOperationException("Remove not yet supported"); |
| } |
| |
| @Override |
| public boolean removeStateModelFactory(String stateModelDef, |
| StateModelFactory<? extends StateModel> factory, String factoryName) { |
| throw new UnsupportedOperationException("Remove not yet supported"); |
| } |
| } |