blob: 80f4faf0b70f45a38c3a5d8027079294242383be [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.loadgen;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.utils.ObjectReader;
/**
* Configuration for a simulated spout.
*/
public class LoadCompConf {
public final String id;
public final int parallelism;
public final List<OutputStream> streams;
public final double cpuLoad;
public final double memoryLoad;
public final SlowExecutorPattern slp;
/**
* Parse the LoadCompConf from a config Map.
* @param conf the map holding the config for a LoadCompConf.
* @return the parsed object.
*/
public static LoadCompConf fromConf(Map<String, Object> conf) {
String id = (String) conf.get("id");
int parallelism = ObjectReader.getInt(conf.get("parallelism"), 1);
List<OutputStream> streams = new ArrayList<>();
List<Map<String, Object>> streamData = (List<Map<String, Object>>) conf.get("streams");
if (streamData != null) {
for (Map<String, Object> streamInfo: streamData) {
streams.add(OutputStream.fromConf(streamInfo));
}
}
double memoryMb = ObjectReader.getDouble(conf.get("memoryLoad"), 0.0);
double cpuPercent = ObjectReader.getDouble(conf.get("cpuLoad"), 0.0);
SlowExecutorPattern slp = null;
if (conf.containsKey("slowExecutorPattern")) {
slp = SlowExecutorPattern.fromConf((Map<String, Object>) conf.get("slowExecutorPattern"));
}
return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent, slp);
}
/**
* Build a config map for this object.
* @return the config map.
*/
public Map<String, Object> toConf() {
Map<String, Object> ret = new HashMap<>();
ret.put("id", id);
ret.put("parallelism", parallelism);
if (memoryLoad > 0) {
ret.put("memoryLoad", memoryLoad);
}
if (cpuLoad > 0) {
ret.put("cpuLoad", cpuLoad);
}
if (streams != null) {
List<Map<String, Object>> streamData = new ArrayList<>();
for (OutputStream out : streams) {
streamData.add(out.toConf());
}
ret.put("streams", streamData);
}
if (slp != null) {
ret.put("slowExecutorPattern", slp.toConf());
}
return ret;
}
/**
* Chenge the name of components and streams according to the parameters passed in.
* @param remappedComponents original component name to new component name.
* @param remappedStreams original stream id to new stream id.
* @return a copy of this with the values remapped.
*/
public LoadCompConf remap(Map<String, String> remappedComponents, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
String remappedId = remappedComponents.get(id);
List<OutputStream> remappedOutStreams = (streams == null) ? null :
streams.stream()
.map((orig) -> orig.remap(id, remappedStreams))
.collect(Collectors.toList());
return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad, slp);
}
/**
* Scale the parallelism of this component by v. The aggregate throughput will be the same.
* The parallelism will be rounded up to the next largest whole number. Parallelism will always be at least 1.
* @param v 1.0 is not change 0.5 is drop the parallelism by half.
* @return a copy of this with the parallelism adjusted.
*/
public LoadCompConf scaleParallel(double v) {
return setParallel(Math.max(1, (int)Math.ceil(parallelism * v)));
}
/**
* Set the parallelism of this component, and adjust the throughput so in aggregate it stays the same.
* @param newParallelism the new parallelism to set.
* @return a copy of this with the adjustments made.
*/
public LoadCompConf setParallel(int newParallelism) {
//We need to adjust the throughput accordingly (so that it stays the same in aggregate)
double throughputAdjustment = ((double)parallelism) / newParallelism;
return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad, slp).scaleThroughput(throughputAdjustment);
}
/**
* Scale the throughput of this component.
* @param v 1.0 is unchanged 0.5 will cut the throughput in half.
* @return a copy of this with the adjustments made.
*/
public LoadCompConf scaleThroughput(double v) {
if (streams != null) {
List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad, slp);
} else {
return this;
}
}
/**
* Override the SlowExecutorPattern with a new one.
* @param slp the new pattern or null if you don't want it to change
* @return a copy of this with the adjustments made.
*/
public LoadCompConf overrideSlowExecutorPattern(SlowExecutorPattern slp) {
if (slp != null) {
return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad, slp);
} else {
return this;
}
}
/**
* Compute the total amount of all messages emitted in all streams per second.
* @return the sum of all messages emitted per second.
*/
public double getAllEmittedAggregate() {
double ret = 0;
if (streams != null) {
for (OutputStream out: streams) {
if (out.rate != null) {
ret += out.rate.mean * parallelism;
}
}
}
return ret;
}
public static class Builder {
private String id;
private int parallelism = 1;
private List<OutputStream> streams;
private double cpuLoad = 0.0;
private double memoryLoad = 0.0;
private SlowExecutorPattern slp = null;
public String getId() {
return id;
}
public Builder withId(String id) {
this.id = id;
return this;
}
public int getParallelism() {
return parallelism;
}
public Builder withParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public List<OutputStream> getStreams() {
return streams;
}
/**
* Add in a single OutputStream to this component.
* @param stream the stream to add
* @return this
*/
public Builder withStream(OutputStream stream) {
if (streams == null) {
streams = new ArrayList<>();
}
streams.add(stream);
return this;
}
public Builder withStreams(List<OutputStream> streams) {
this.streams = streams;
return this;
}
public Builder withCpuLoad(double cpuLoad) {
this.cpuLoad = cpuLoad;
return this;
}
public Builder withMemoryLoad(double memoryLoad) {
this.memoryLoad = memoryLoad;
return this;
}
public Builder withSlowExecutorPattern(SlowExecutorPattern slp) {
this.slp = slp;
return this;
}
public LoadCompConf build() {
return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad, slp);
}
}
/**
* Create a new LoadCompConf with the given values.
* @param id the id of the component.
* @param parallelism tha parallelism of the component.
* @param streams the output streams of the component.
*/
public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad,
SlowExecutorPattern slp) {
this.id = id;
if (id == null) {
throw new IllegalArgumentException("A spout ID cannot be null");
}
this.parallelism = parallelism;
this.streams = streams;
this.cpuLoad = cpuLoad;
this.memoryLoad = memoryLoad;
this.slp = slp;
}
}