| package org.apache.helix.controller.stages; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.controller.LogUtil; |
| import org.apache.helix.controller.common.PartitionStateMap; |
| import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; |
| import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; |
| import org.apache.helix.controller.pipeline.AsyncWorkerType; |
| import org.apache.helix.model.BuiltInStateModelDefinitions; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.MasterSlaveSMD; |
| import org.apache.helix.model.Partition; |
| import org.apache.helix.model.Resource; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Persist the ResourceAssignment of each resource that went through rebalancing |
| */ |
| public class PersistAssignmentStage extends AbstractAsyncBaseStage { |
| private static final Logger LOG = LoggerFactory.getLogger(PersistAssignmentStage.class); |
| |
| @Override |
| public AsyncWorkerType getAsyncWorkerType() { |
| return AsyncWorkerType.PersistAssignmentWorker; |
| } |
| |
| @Override |
| public void execute(final ClusterEvent event) throws Exception { |
| ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); |
| ClusterConfig clusterConfig = cache.getClusterConfig(); |
| |
| if (!clusterConfig.isPersistBestPossibleAssignment() && !clusterConfig |
| .isPersistIntermediateAssignment()) { |
| return; |
| } |
| |
| BestPossibleStateOutput bestPossibleAssignment = |
| event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); |
| |
| HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); |
| HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); |
| PropertyKey.Builder keyBuilder = accessor.keyBuilder(); |
| Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); |
| |
| for (String resourceId : bestPossibleAssignment.resourceSet()) { |
| try { |
| persistAssignment(resourceMap.get(resourceId), cache, event, bestPossibleAssignment, |
| clusterConfig, accessor, keyBuilder); |
| } catch (HelixException ex) { |
| LogUtil |
| .logError(LOG, _eventId, "Failed to persist assignment for resource " + resourceId, ex); |
| } |
| } |
| } |
| |
| private void persistAssignment(final Resource resource, final ResourceControllerDataProvider cache, |
| final ClusterEvent event, final BestPossibleStateOutput bestPossibleAssignment, |
| final ClusterConfig clusterConfig, final HelixDataAccessor accessor, |
| final PropertyKey.Builder keyBuilder) { |
| String resourceId = resource.getResourceName(); |
| if (resource != null) { |
| final IdealState idealState = cache.getIdealState(resourceId); |
| if (idealState == null) { |
| LogUtil.logWarn(LOG, event.getEventId(), "IdealState not found for resource " + resourceId); |
| return; |
| } |
| IdealState.RebalanceMode mode = idealState.getRebalanceMode(); |
| if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode |
| .equals(IdealState.RebalanceMode.FULL_AUTO)) { |
| // do not persist assignment for resource in neither semi or full auto. |
| return; |
| } |
| |
| // Record IdealState delta in a different object. Avoid modifying the cached IdealState object |
| // which is supposed to be read only. |
| IdealState delta = new IdealState(resourceId); |
| boolean needPersist = false; |
| if (mode.equals(IdealState.RebalanceMode.FULL_AUTO)) { |
| // persist preference list in ful-auto mode. |
| Map<String, List<String>> newLists = bestPossibleAssignment.getPreferenceLists(resourceId); |
| if (newLists != null && hasPreferenceListChanged(newLists, idealState)) { |
| delta.setPreferenceLists(newLists); |
| needPersist = true; |
| } |
| } |
| |
| PartitionStateMap partitionStateMap = bestPossibleAssignment.getPartitionStateMap(resourceId); |
| if (clusterConfig.isPersistIntermediateAssignment()) { |
| IntermediateStateOutput intermediateAssignment = |
| event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); |
| partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId); |
| } |
| |
| //TODO: temporary solution for Espresso/Dbus backcompatible, should remove this. |
| Map<Partition, Map<String, String>> assignmentToPersist = |
| convertAssignmentPersisted(resource, idealState, partitionStateMap.getStateMap()); |
| |
| if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) { |
| for (Partition partition : assignmentToPersist.keySet()) { |
| Map<String, String> instanceMap = assignmentToPersist.get(partition); |
| delta.setInstanceStateMap(partition.getPartitionName(), instanceMap); |
| } |
| needPersist = true; |
| } |
| |
| if (needPersist) { |
| // Update instead of set to ensure any intermediate changes that the controller does not update are kept. |
| accessor.updateProperty(keyBuilder.idealStates(resourceId), current -> { |
| if (current != null) { |
| ZNRecord deltaRecord = delta.getRecord(); |
| // Overwrite MapFields and ListFields items with the same key. |
| if (!deltaRecord.getMapFields().isEmpty()) { |
| // Note that default merge will keep old values in the maps unchanged, which is not desired. |
| current.getMapFields().clear(); |
| current.getMapFields().putAll(deltaRecord.getMapFields()); |
| } |
| if (!deltaRecord.getListFields().isEmpty()) { |
| // Don't clear the list fields since the key might be user's input. And even the |
| // corresponding list is empty, we shall not remove it. Otherwise, it may cause |
| // rebalance failure. |
| current.getListFields().putAll(deltaRecord.getListFields()); |
| } |
| } |
| return current; |
| }, idealState); |
| } |
| } |
| } |
| |
| /** |
| * has the preference list changed from the one persisted in current IdealState |
| */ |
| private boolean hasPreferenceListChanged(Map<String, List<String>> newLists, |
| IdealState idealState) { |
| Map<String, List<String>> existLists = idealState.getPreferenceLists(); |
| |
| Set<String> partitions = new HashSet<String>(newLists.keySet()); |
| partitions.addAll(existLists.keySet()); |
| |
| for (String partition : partitions) { |
| List<String> assignedInstances = newLists.get(partition); |
| List<String> existingInstances = existLists.get(partition); |
| if (assignedInstances == null && existingInstances == null) { |
| continue; |
| } |
| if (assignedInstances == null || existingInstances == null || !assignedInstances |
| .equals(existingInstances)) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| private boolean hasInstanceMapChanged(Map<Partition, Map<String, String>> newAssiments, |
| IdealState idealState) { |
| Set<Partition> partitions = new HashSet<Partition>(newAssiments.keySet()); |
| for (String p : idealState.getPartitionSet()) { |
| partitions.add(new Partition(p)); |
| } |
| |
| for (Partition partition : partitions) { |
| Map<String, String> instanceMap = newAssiments.get(partition); |
| Map<String, String> existInstanceMap = |
| idealState.getInstanceStateMap(partition.getPartitionName()); |
| if (instanceMap == null && existInstanceMap == null) { |
| continue; |
| } |
| if (instanceMap == null || existInstanceMap == null || !instanceMap |
| .equals(existInstanceMap)) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * TODO: This is a temporary hacky for back-compatible support of Espresso and Databus, we should |
| * get rid of this conversion as soon as possible. --- Lei, 2016/9/9. |
| */ |
| private Map<Partition, Map<String, String>> convertAssignmentPersisted(Resource resource, |
| IdealState idealState, Map<Partition, Map<String, String>> assignments) { |
| String stateModelDef = idealState.getStateModelDefRef(); |
| /** Only convert for MasterSlave resources */ |
| if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name()) || idealState |
| .getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)) { |
| return assignments; |
| } |
| |
| Map<Partition, Map<String, String>> assignmentToPersist = |
| new HashMap<Partition, Map<String, String>>(); |
| |
| for (Partition partition : resource.getPartitions()) { |
| Map<String, String> instanceMap = new HashMap<String, String>(); |
| Map<String, String> assignment = assignments.get(partition); |
| if (assignment != null) { |
| instanceMap.putAll(assignment); |
| } |
| |
| List<String> preferenceList = idealState.getPreferenceList(partition.getPartitionName()); |
| if (preferenceList == null) { |
| preferenceList = Collections.emptyList(); |
| } |
| Set<String> nodeList = new HashSet<String>(preferenceList); |
| nodeList.addAll(assignment.keySet()); |
| boolean hasMaster = false; |
| for (String ins : nodeList) { |
| String state = instanceMap.get(ins); |
| if (state == null || (!state.equals(MasterSlaveSMD.States.SLAVE.name()) && !state |
| .equals(MasterSlaveSMD.States.MASTER.name()))) { |
| instanceMap.put(ins, MasterSlaveSMD.States.SLAVE.name()); |
| } |
| |
| if (state != null && state.equals(MasterSlaveSMD.States.MASTER.name())) { |
| hasMaster = true; |
| } |
| } |
| |
| // if no master, just pick the first node in the preference list as the master. |
| if (!hasMaster && preferenceList.size() > 0) { |
| instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name()); |
| } |
| |
| assignmentToPersist.put(partition, instanceMap); |
| } |
| |
| return assignmentToPersist; |
| } |
| } |