| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.scheduler.resource.normalization; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import org.apache.storm.Config; |
| import org.apache.storm.Constants; |
| import org.apache.storm.daemon.Acker; |
| import org.apache.storm.generated.ComponentCommon; |
| import org.apache.storm.generated.WorkerResources; |
| import org.apache.storm.utils.ObjectReader; |
| import org.json.simple.JSONObject; |
| import org.json.simple.parser.JSONParser; |
| import org.json.simple.parser.ParseException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A resource request with normalized resource names. |
| */ |
| public class NormalizedResourceRequest implements NormalizedResourcesWithMemory { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class); |
| private final NormalizedResources normalizedResources; |
| private double onHeap; |
| private double offHeap; |
| |
| private NormalizedResourceRequest(Map<String, ? extends Number> resources, |
| Map<String, Double> defaultResources) { |
| if (resources == null && defaultResources == null) { |
| onHeap = 0.0; |
| offHeap = 0.0; |
| normalizedResources = new NormalizedResources(); |
| } else { |
| Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER |
| .normalizedResourceMap(defaultResources); |
| normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources)); |
| onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0); |
| offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0); |
| normalizedResources = new NormalizedResources(normalizedResourceMap); |
| } |
| } |
| |
| public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf, String componentId) { |
| this(parseResources(component.get_json_conf()), getDefaultResources(topoConf, componentId)); |
| } |
| |
| public NormalizedResourceRequest(Map<String, Object> topoConf, String componentId) { |
| this((Map<String, ? extends Number>) null, getDefaultResources(topoConf, componentId)); |
| } |
| |
| public NormalizedResourceRequest() { |
| this((Map<String, ? extends Number>) null, null); |
| } |
| |
| private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) { |
| if (!dest.containsKey(destKey)) { |
| Number value = (Number) src.get(srcKey); |
| if (value != null) { |
| dest.put(destKey, value.doubleValue()); |
| } |
| } |
| } |
| |
| private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf, String componentId) { |
| Map<String, Double> ret = |
| NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault( |
| Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>())); |
| |
| // Some components might have different resource configs. |
| if (componentId != null) { |
| if (componentId.equals(Acker.ACKER_COMPONENT_ID)) { |
| if (topoConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)) { |
| ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, |
| ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB))); |
| } |
| if (topoConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)) { |
| ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, |
| ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB))); |
| } |
| if (topoConf.containsKey(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)) { |
| ret.put(Constants.COMMON_CPU_RESOURCE_NAME, |
| ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT))); |
| } |
| } else if (componentId.startsWith(Constants.METRICS_COMPONENT_ID_PREFIX)) { |
| if (topoConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB)) { |
| ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, |
| ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB))); |
| } |
| if (topoConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB)) { |
| ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, |
| ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB))); |
| } |
| if (topoConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)) { |
| ret.put(Constants.COMMON_CPU_RESOURCE_NAME, |
| ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT))); |
| } |
| } |
| } |
| |
| putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); |
| putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); |
| putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); |
| return ret; |
| } |
| |
| private static Map<String, Double> parseResources(String input) { |
| Map<String, Double> topologyResources = new HashMap<>(); |
| JSONParser parser = new JSONParser(); |
| LOG.debug("Input to parseResources {}", input); |
| try { |
| if (input != null) { |
| Object obj = parser.parse(input); |
| JSONObject jsonObject = (JSONObject) obj; |
| |
| // Legacy resource parsing |
| if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) { |
| Double topoMemOnHeap = ObjectReader |
| .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null); |
| topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap); |
| } |
| if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) { |
| Double topoMemOffHeap = ObjectReader |
| .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null); |
| topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap); |
| } |
| if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { |
| Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), |
| null); |
| topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu); |
| } |
| |
| // If resource is also present in resources map will overwrite the above |
| if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { |
| Map<String, Number> rawResourcesMap = |
| (Map<String, Number>) jsonObject.computeIfAbsent( |
| Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>()); |
| |
| for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) { |
| topologyResources.put( |
| stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue()); |
| } |
| |
| } |
| } |
| } catch (ParseException e) { |
| LOG.error("Failed to parse component resources is:" + e.toString(), e); |
| return null; |
| } |
| return topologyResources; |
| } |
| |
| /** |
| * Convert to a map that is used by configuration and the UI. |
| * @return a map with the key as the resource name and the value the resource amount. |
| */ |
| public Map<String, Double> toNormalizedMap() { |
| Map<String, Double> ret = this.normalizedResources.toNormalizedMap(); |
| ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap); |
| ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap); |
| return ret; |
| } |
| |
| public double getOnHeapMemoryMb() { |
| return onHeap; |
| } |
| |
| public void addOnHeap(final double onHeap) { |
| this.onHeap += onHeap; |
| } |
| |
| public double getOffHeapMemoryMb() { |
| return offHeap; |
| } |
| |
| public void addOffHeap(final double offHeap) { |
| this.offHeap += offHeap; |
| } |
| |
| /** |
| * Add the resources in other to this. |
| * |
| * @param other the other Request to add to this. |
| */ |
| public void add(NormalizedResourceRequest other) { |
| this.normalizedResources.add(other.normalizedResources); |
| onHeap += other.onHeap; |
| offHeap += other.offHeap; |
| } |
| |
| /** |
| * Add the resources from a worker to those in this. |
| * @param value the resources on the worker. |
| */ |
| public void add(WorkerResources value) { |
| this.normalizedResources.add(value); |
| //The resources are already normalized |
| Map<String, Double> resources = value.get_resources(); |
| onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0); |
| offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0); |
| } |
| |
| @Override |
| public double getTotalMemoryMb() { |
| return onHeap + offHeap; |
| } |
| |
| @Override |
| public String toString() { |
| return "Normalized resources: " + toNormalizedMap(); |
| } |
| |
| public double getTotalCpu() { |
| return this.normalizedResources.getTotalCpu(); |
| } |
| |
| @Override |
| public NormalizedResources getNormalizedResources() { |
| return this.normalizedResources; |
| } |
| |
| @Override |
| public void clear() { |
| normalizedResources.clear(); |
| offHeap = 0.0; |
| onHeap = 0.0; |
| } |
| |
| @Override |
| public boolean areAnyOverZero() { |
| return onHeap > 0 || offHeap > 0 || normalizedResources.areAnyOverZero(); |
| } |
| } |