blob: 478a8bed13c02630603f41504d27e47ec1d4c0a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler.resource.normalization;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.daemon.Acker;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.utils.ObjectReader;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A resource request with normalized resource names.
*/
public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
private final NormalizedResources normalizedResources;
private double onHeap;
private double offHeap;
private NormalizedResourceRequest(Map<String, ? extends Number> resources,
Map<String, Double> defaultResources) {
if (resources == null && defaultResources == null) {
onHeap = 0.0;
offHeap = 0.0;
normalizedResources = new NormalizedResources();
} else {
Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER
.normalizedResourceMap(defaultResources);
normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
normalizedResources = new NormalizedResources(normalizedResourceMap);
}
}
public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf, String componentId) {
this(parseResources(component.get_json_conf()), getDefaultResources(topoConf, componentId));
}
public NormalizedResourceRequest(Map<String, Object> topoConf, String componentId) {
this((Map<String, ? extends Number>) null, getDefaultResources(topoConf, componentId));
}
public NormalizedResourceRequest() {
this((Map<String, ? extends Number>) null, null);
}
private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
if (!dest.containsKey(destKey)) {
Number value = (Number) src.get(srcKey);
if (value != null) {
dest.put(destKey, value.doubleValue());
}
}
}
private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf, String componentId) {
Map<String, Double> ret =
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
// Some components might have different resource configs.
if (componentId != null) {
if (componentId.equals(Acker.ACKER_COMPONENT_ID)) {
if (topoConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)) {
ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME,
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)));
}
if (topoConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)) {
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME,
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)));
}
if (topoConf.containsKey(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)) {
ret.put(Constants.COMMON_CPU_RESOURCE_NAME,
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)));
}
} else if (componentId.startsWith(Constants.METRICS_COMPONENT_ID_PREFIX)) {
if (topoConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB)) {
ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME,
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB)));
}
if (topoConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB)) {
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME,
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB)));
}
if (topoConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)) {
ret.put(Constants.COMMON_CPU_RESOURCE_NAME,
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)));
}
}
}
putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
return ret;
}
private static Map<String, Double> parseResources(String input) {
Map<String, Double> topologyResources = new HashMap<>();
JSONParser parser = new JSONParser();
LOG.debug("Input to parseResources {}", input);
try {
if (input != null) {
Object obj = parser.parse(input);
JSONObject jsonObject = (JSONObject) obj;
// Legacy resource parsing
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
Double topoMemOnHeap = ObjectReader
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
}
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
Double topoMemOffHeap = ObjectReader
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
}
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
null);
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
}
// If resource is also present in resources map will overwrite the above
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
Map<String, Number> rawResourcesMap =
(Map<String, Number>) jsonObject.computeIfAbsent(
Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
topologyResources.put(
stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
}
}
}
} catch (ParseException e) {
LOG.error("Failed to parse component resources is:" + e.toString(), e);
return null;
}
return topologyResources;
}
/**
* Convert to a map that is used by configuration and the UI.
* @return a map with the key as the resource name and the value the resource amount.
*/
public Map<String, Double> toNormalizedMap() {
Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
return ret;
}
public double getOnHeapMemoryMb() {
return onHeap;
}
public void addOnHeap(final double onHeap) {
this.onHeap += onHeap;
}
public double getOffHeapMemoryMb() {
return offHeap;
}
public void addOffHeap(final double offHeap) {
this.offHeap += offHeap;
}
/**
* Add the resources in other to this.
*
* @param other the other Request to add to this.
*/
public void add(NormalizedResourceRequest other) {
this.normalizedResources.add(other.normalizedResources);
onHeap += other.onHeap;
offHeap += other.offHeap;
}
/**
* Add the resources from a worker to those in this.
* @param value the resources on the worker.
*/
public void add(WorkerResources value) {
this.normalizedResources.add(value);
//The resources are already normalized
Map<String, Double> resources = value.get_resources();
onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
}
@Override
public double getTotalMemoryMb() {
return onHeap + offHeap;
}
@Override
public String toString() {
return "Normalized resources: " + toNormalizedMap();
}
public double getTotalCpu() {
return this.normalizedResources.getTotalCpu();
}
@Override
public NormalizedResources getNormalizedResources() {
return this.normalizedResources;
}
@Override
public void clear() {
normalizedResources.clear();
offHeap = 0.0;
onHeap = 0.0;
}
@Override
public boolean areAnyOverZero() {
return onHeap > 0 || offHeap > 0 || normalizedResources.areAnyOverZero();
}
}