| package org.apache.helix.controller.stages; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixDefinedState; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyKey.Builder; |
| import org.apache.helix.controller.LogUtil; |
| import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; |
| import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; |
| import org.apache.helix.controller.pipeline.AsyncWorkerType; |
| import org.apache.helix.controller.pipeline.StageException; |
| import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.Message.MessageType; |
| import org.apache.helix.model.Partition; |
| import org.apache.helix.model.Resource; |
| import org.apache.helix.model.ResourceConfig; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.model.StatusUpdate; |
| import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.datamodel.ZNRecordDelta; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ExternalViewComputeStage extends AbstractAsyncBaseStage { |
| private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class); |
| |
| @Override |
| public AsyncWorkerType getAsyncWorkerType() { |
| return AsyncWorkerType.ExternalViewComputeWorker; |
| } |
| |
| @Override |
| public void execute(final ClusterEvent event) throws Exception { |
| _eventId = event.getEventId(); |
| HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); |
| Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); |
| ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); |
| |
| if (manager == null || resourceMap == null || cache == null) { |
| throw new StageException("Missing attributes in event:" + event |
| + ". Requires ClusterManager|RESOURCES|DataCache"); |
| } |
| |
| HelixDataAccessor dataAccessor = manager.getHelixDataAccessor(); |
| PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); |
| |
| CurrentStateOutput currentStateOutput = |
| event.getAttribute(AttributeName.CURRENT_STATE.name()); |
| ClusterStatusMonitor clusterStatusMonitor = |
| event.getAttribute(AttributeName.clusterStatusMonitor.name()); |
| |
| List<ExternalView> newExtViews = new ArrayList<>(); |
| Set<String> monitoringResources = new HashSet<>(); |
| |
| Map<String, ExternalView> curExtViews = cache.getExternalViews(); |
| |
| for (Resource resource : resourceMap.values()) { |
| try { |
| computeExternalView(resource, currentStateOutput, cache, clusterStatusMonitor, curExtViews, |
| manager, monitoringResources, newExtViews); |
| } catch (HelixException ex) { |
| LogUtil.logError(LOG, _eventId, |
| "Failed to calculate external view for resource " + resource.getResourceName(), ex); |
| } |
| } |
| |
| // Keep MBeans for existing resources and unregister MBeans for dropped resources |
| if (clusterStatusMonitor != null) { |
| clusterStatusMonitor.retainResourceMonitor(monitoringResources); |
| } |
| |
| List<String> externalViewsToRemove = new ArrayList<>(); |
| // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all. |
| // Are there any entity that will be interested in its change? |
| |
| // For the resource with DisableExternalView option turned on in IdealState |
| // We will not actually create or write the externalView to ZooKeeper. |
| List<PropertyKey> keys = new ArrayList<>(); |
| for(Iterator<ExternalView> it = newExtViews.iterator(); it.hasNext(); ) { |
| ExternalView view = it.next(); |
| String resourceName = view.getResourceName(); |
| IdealState idealState = cache.getIdealState(resourceName); |
| if (idealState != null && idealState.isExternalViewDisabled()) { |
| it.remove(); |
| // remove the external view if the external view exists |
| if (curExtViews.containsKey(resourceName)) { |
| LogUtil |
| .logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName); |
| dataAccessor.removeProperty(keyBuilder.externalView(resourceName)); |
| externalViewsToRemove.add(resourceName); |
| } |
| } else { |
| keys.add(keyBuilder.externalView(resourceName)); |
| } |
| } |
| |
| // add/update external-views |
| if (newExtViews.size() > 0) { |
| dataAccessor.setChildren(keys, newExtViews); |
| cache.updateExternalViews(newExtViews); |
| } |
| |
| // remove dead external-views |
| for (String resourceName : curExtViews.keySet()) { |
| if (!resourceMap.keySet().contains(resourceName)) { |
| LogUtil.logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName); |
| dataAccessor.removeProperty(keyBuilder.externalView(resourceName)); |
| externalViewsToRemove.add(resourceName); |
| } |
| } |
| cache.removeExternalViews(externalViewsToRemove); |
| } |
| |
| private void computeExternalView(final Resource resource, |
| final CurrentStateOutput currentStateOutput, final ResourceControllerDataProvider cache, |
| final ClusterStatusMonitor clusterStatusMonitor, final Map<String, ExternalView> curExtViews, |
| final HelixManager manager, Set<String> monitoringResources, List<ExternalView> newExtViews) { |
| String resourceName = resource.getResourceName(); |
| ExternalView view = new ExternalView(resource.getResourceName()); |
| // if resource ideal state has bucket size, set it |
| // otherwise resource has been dropped, use bucket size from current state instead |
| if (resource.getBucketSize() > 0) { |
| view.setBucketSize(resource.getBucketSize()); |
| } else { |
| view.setBucketSize(currentStateOutput.getBucketSize(resourceName)); |
| } |
| |
| int totalPendingMessageCount = 0; |
| |
| for (Partition partition : resource.getPartitions()) { |
| Map<String, String> currentStateMap = |
| currentStateOutput.getCurrentStateMap(resourceName, partition); |
| if (currentStateMap != null && currentStateMap.size() > 0) { |
| for (String instance : currentStateMap.keySet()) { |
| view.setState(partition.getPartitionName(), instance, currentStateMap.get(instance)); |
| } |
| } |
| totalPendingMessageCount += |
| currentStateOutput.getPendingMessageMap(resource.getResourceName(), partition).size(); |
| } |
| |
| // Update cluster status monitor mbean |
| IdealState idealState = cache.getIdealState(resourceName); |
| ResourceConfig resourceConfig = cache.getResourceConfig(resourceName); |
| if (clusterStatusMonitor != null) { |
| if (idealState != null // has ideal state |
| && (resourceConfig == null || !resourceConfig.isMonitoringDisabled()) // monitoring not disabled |
| && !idealState.getStateModelDefRef() // and not a job resource |
| .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { |
| clusterStatusMonitor |
| .setResourcePendingMessages(resourceName ,totalPendingMessageCount); |
| monitoringResources.add(resourceName); |
| } |
| } |
| |
| ExternalView curExtView = curExtViews.get(resourceName); |
| // copy simplefields from IS, in cases where IS is deleted copy it from existing ExternalView |
| if (idealState != null) { |
| view.getRecord().getSimpleFields().putAll(idealState.getRecord().getSimpleFields()); |
| } else if (curExtView != null) { |
| view.getRecord().getSimpleFields().putAll(curExtView.getRecord().getSimpleFields()); |
| } |
| |
| // compare the new external view with current one, set only on different |
| if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) { |
| // Add external view to the list which will be written to ZK later. |
| newExtViews.add(view); |
| |
| // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which |
| // task partitions are finished (COMPLETED or ERROR), update the status update of the original |
| // scheduler message, and then remove the partitions from the ideal state |
| if (idealState != null |
| && idealState.getStateModelDefRef().equalsIgnoreCase( |
| DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { |
| updateScheduledTaskStatus(view, manager, idealState); |
| } |
| } |
| } |
| |
| private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, |
| IdealState taskQueueIdealState) { |
| HelixDataAccessor accessor = manager.getHelixDataAccessor(); |
| ZNRecord finishedTasks = new ZNRecord(ev.getResourceName()); |
| |
| // Place holder for finished partitions |
| Map<String, String> emptyMap = new HashMap<String, String>(); |
| List<String> emptyList = new LinkedList<String>(); |
| |
| Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>(); |
| Map<String, Map<String, String>> controllerMsgUpdates = |
| new HashMap<String, Map<String, String>>(); |
| |
| Builder keyBuilder = accessor.keyBuilder(); |
| |
| for (String taskPartitionName : ev.getPartitionSet()) { |
| for (String taskState : ev.getStateMap(taskPartitionName).values()) { |
| if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString()) || taskState |
| .equalsIgnoreCase("COMPLETED")) { |
| LogUtil.logInfo(LOG, _eventId, taskPartitionName + " finished as " + taskState); |
| finishedTasks.getListFields().put(taskPartitionName, emptyList); |
| finishedTasks.getMapFields().put(taskPartitionName, emptyMap); |
| |
| // Update original scheduler message status update |
| if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) { |
| String controllerMsgId = taskQueueIdealState.getRecord().getMapField(taskPartitionName) |
| .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID); |
| if (controllerMsgId != null) { |
| LogUtil.logInfo(LOG, _eventId, |
| taskPartitionName + " finished with controllerMsg " + controllerMsgId); |
| if (!controllerMsgUpdates.containsKey(controllerMsgId)) { |
| controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>()); |
| } |
| controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState); |
| } |
| } |
| } |
| } |
| } |
| // fill the controllerMsgIdCountMap |
| for (String taskId : taskQueueIdealState.getPartitionSet()) { |
| String controllerMsgId = |
| taskQueueIdealState.getRecord().getMapField(taskId) |
| .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID); |
| if (controllerMsgId != null) { |
| if (!controllerMsgIdCountMap.containsKey(controllerMsgId)) { |
| controllerMsgIdCountMap.put(controllerMsgId, 0); |
| } |
| controllerMsgIdCountMap.put(controllerMsgId, |
| (controllerMsgIdCountMap.get(controllerMsgId) + 1)); |
| } |
| } |
| |
| if (controllerMsgUpdates.size() > 0) { |
| for (String controllerMsgId : controllerMsgUpdates.keySet()) { |
| PropertyKey controllerStatusUpdateKey = |
| keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), controllerMsgId); |
| StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey); |
| for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) { |
| Map<String, String> result = new HashMap<String, String>(); |
| result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName)); |
| controllerStatusUpdate.getRecord().setMapField( |
| "MessageResult " |
| + taskQueueIdealState.getRecord().getMapField(taskPartitionName) |
| .get(Message.Attributes.TGT_NAME.toString()) |
| + " " |
| + taskPartitionName |
| + " " |
| + taskQueueIdealState.getRecord().getMapField(taskPartitionName) |
| .get(Message.Attributes.MSG_ID.toString()), result); |
| } |
| // All done for the scheduled tasks that came from controllerMsgId, add summary for it |
| Integer controllerMsgIdCount = controllerMsgIdCountMap.get(controllerMsgId); |
| if (controllerMsgIdCount != null |
| && controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCount.intValue()) { |
| int finishedTasksNum = 0; |
| int completedTasksNum = 0; |
| for (String key : controllerStatusUpdate.getRecord().getMapFields().keySet()) { |
| if (key.startsWith("MessageResult ")) { |
| finishedTasksNum++; |
| } |
| if (controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null) { |
| if (controllerStatusUpdate.getRecord().getMapField(key).get("Result") |
| .equalsIgnoreCase("COMPLETED")) { |
| completedTasksNum++; |
| } |
| } |
| } |
| Map<String, String> summary = new TreeMap<String, String>(); |
| summary.put("TotalMessages:", "" + finishedTasksNum); |
| summary.put("CompletedMessages", "" + completedTasksNum); |
| |
| controllerStatusUpdate.getRecord().setMapField("Summary", summary); |
| } |
| // Update the statusUpdate of controllerMsgId |
| accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate); |
| } |
| } |
| |
| if (finishedTasks.getListFields().size() > 0) { |
| ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, ZNRecordDelta.MergeOperation.SUBTRACT); |
| List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>(); |
| deltaList.add(znDelta); |
| IdealState delta = new IdealState(taskQueueIdealState.getResourceName()); |
| delta.setDeltaList(deltaList); |
| |
| // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate |
| keyBuilder = accessor.keyBuilder(); |
| accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta); |
| } |
| } |
| |
| } |