blob: 366995e62cc52c2c938a42bb805fccaab961729f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.scheduler.resource;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ResourceUtils {
private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
public static NormalizedResourceRequest getBoltResources(StormTopology topology, Map<String, Object> topologyConf,
String componentId) {
if (topology.get_bolts() != null) {
Bolt bolt = topology.get_bolts().get(componentId);
return new NormalizedResourceRequest(bolt.get_common(), topologyConf, componentId);
}
return null;
}
public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) {
Map<String, NormalizedResourceRequest> boltResources = new HashMap<>();
if (topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(bolt.getValue().get_common(),
topologyConf, bolt.getKey());
if (LOG.isTraceEnabled()) {
LOG.trace("Turned {} into {}", bolt.getValue().get_common().get_json_conf(), topologyResources);
}
boltResources.put(bolt.getKey(), topologyResources);
}
}
return boltResources;
}
public static NormalizedResourceRequest getSpoutResources(StormTopology topology,
Map<String, Object> topologyConf, String componentId) {
if (topology.get_spouts() != null) {
SpoutSpec spout = topology.get_spouts().get(componentId);
return new NormalizedResourceRequest(spout.get_common(), topologyConf, componentId);
}
return null;
}
public static Map<String, NormalizedResourceRequest> getSpoutsResources(StormTopology topology,
Map<String, Object> topologyConf) {
Map<String, NormalizedResourceRequest> spoutResources = new HashMap<>();
if (topology.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(spout.getValue().get_common(),
topologyConf, spout.getKey());
if (LOG.isTraceEnabled()) {
LOG.trace("Turned {} into {}", spout.getValue().get_common().get_json_conf(), topologyResources);
}
spoutResources.put(spout.getKey(), topologyResources);
}
}
return spoutResources;
}
public static void updateStormTopologyResources(StormTopology topology, Map<String, Map<String, Double>> resourceUpdatesMap) {
Map<String, Map<String, Double>> componentsUpdated = new HashMap<>();
if (topology.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
SpoutSpec spoutSpec = spout.getValue();
String spoutName = spout.getKey();
if (resourceUpdatesMap.containsKey(spoutName)) {
ComponentCommon spoutCommon = spoutSpec.get_common();
Map<String, Double> resourcesUpdate = NormalizedResources
.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(spoutName));
String newJsonConf = getJsonWithUpdatedResources(spoutCommon.get_json_conf(), resourcesUpdate);
spoutCommon.set_json_conf(newJsonConf);
componentsUpdated.put(spoutName, resourcesUpdate);
}
}
}
if (topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
Bolt boltObj = bolt.getValue();
String boltName = bolt.getKey();
if (resourceUpdatesMap.containsKey(boltName)) {
ComponentCommon boltCommon = boltObj.get_common();
Map<String, Double> resourcesUpdate = NormalizedResources
.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(boltName));
String newJsonConf = getJsonWithUpdatedResources(boltCommon.get_json_conf(), resourceUpdatesMap.get(boltName));
boltCommon.set_json_conf(newJsonConf);
componentsUpdated.put(boltName, resourcesUpdate);
}
}
}
LOG.info("Component resources updated: {}", componentsUpdated);
Map<String, Map<String, Double>> notUpdated = new HashMap<String, Map<String, Double>>();
for (String component : resourceUpdatesMap.keySet()) {
if (!componentsUpdated.containsKey(component)) {
notUpdated.put(component, resourceUpdatesMap.get(component));
}
}
LOG.info("Component resource updates ignored: {}", notUpdated);
}
public static String getCorrespondingLegacyResourceName(String normalizedResourceName) {
for (Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) {
if (entry.getValue().equals(normalizedResourceName)) {
return entry.getKey();
}
}
return normalizedResourceName;
}
public static String getJsonWithUpdatedResources(String jsonConf, Map<String, Double> resourceUpdates) {
try {
JSONParser parser = new JSONParser();
Object obj = parser.parse(jsonConf);
JSONObject jsonObject = (JSONObject) obj;
Map<String, Double> componentResourceMap =
(Map<String, Double>) jsonObject.getOrDefault(
Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<String, Double>()
);
for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) {
if (NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey())) {
// if there will be legacy values they will be in the outer conf
jsonObject.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
componentResourceMap.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
}
componentResourceMap.put(resourceUpdateEntry.getKey(), resourceUpdateEntry.getValue());
}
jsonObject.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, componentResourceMap);
return jsonObject.toJSONString();
} catch (ParseException ex) {
throw new RuntimeException("Failed to parse component resources with json: " + jsonConf);
}
}
}