blob: 0918bc30ebb4e6d821caa91378eb2154231bf144 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.profiler.log;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.costs.EstimationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.AtomicExecution;
import org.apache.wayang.core.platform.AtomicExecutionGroup;
import org.apache.wayang.core.platform.PartialExecution;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.profiling.ExecutionLog;
import org.apache.wayang.core.util.Bitmask;
import org.apache.wayang.core.util.Formats;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.java.Java;
import org.apache.wayang.postgres.Postgres;
import org.apache.wayang.spark.Spark;
import org.apache.wayang.sqlite3.Sqlite3;
/**
* This app tries to infer good {@link LoadProfileEstimator}s for {@link ExecutionOperator}s using data from an
* {@link ExecutionLog}.
*/
public class GeneticOptimizerApp {
private static final Logger logger = LogManager.getLogger(GeneticOptimizerApp.class);
/**
* {@link Configuration} to be used.
*/
final Configuration configuration;
/**
* Maintains {@link Variable}s to be optimized.
*/
OptimizationSpace optimizationSpace;
/**
* Maintains {@link PartialExecution}s as training data.
*/
List<PartialExecution> partialExecutions;
/**
* The {@link #partialExecutions} grouped by their containing {@link ExecutionOperator}s.
*/
private final List<List<PartialExecution>> partialExecutionGroups;
/**
* Maintains a {@link DynamicLoadProfileEstimator} for every {@link Configuration} key in the
* {@link #partialExecutions}.
*/
Map<String, DynamicLoadProfileEstimator> estimators;
/**
* Maintains variables that quantify the overhead for initializing a {@link Platform}.
*/
Map<Platform, Variable> platformOverheads = new HashMap<>();
/**
* Creates a new instance.
*
* @param configuration provides, amongst others, platform specifications
*/
public GeneticOptimizerApp(Configuration configuration) {
this.configuration = configuration;
// Initialize platforms.
Java.platform();
Spark.platform();
Sqlite3.platform();
Postgres.platform();
//TODO: add dinamically Graphchi, if the version of scala is 2.11
// Load the ExecutionLog.
double samplingFactor = this.configuration.getDoubleProperty("wayang.profiler.ga.sampling", 1d);
double maxCardinalitySpread = this.configuration.getDoubleProperty("wayang.profiler.ga.max-cardinality-spread", 1d);
double minCardinalityConfidence = this.configuration.getDoubleProperty("wayang.profiler.ga.min-cardinality-confidence", 1d);
long minExecutionTime = this.configuration.getLongProperty("wayang.profiler.ga.min-exec-time", 1);
try (ExecutionLog executionLog = ExecutionLog.open(configuration)) {
this.partialExecutions = executionLog.stream().collect(Collectors.toList());
int lastSize = this.partialExecutions.size();
this.partialExecutions.removeIf(partialExecution -> !this.checkEstimatorTemplates(partialExecution));
int newSize = this.partialExecutions.size();
System.out.printf("Removed %d executions with no template-based estimators.\n", lastSize - newSize);
lastSize = newSize;
this.partialExecutions.removeIf(partialExecution -> !this.checkSpread(partialExecution, maxCardinalitySpread));
newSize = this.partialExecutions.size();
System.out.printf("Removed %d executions with a too large cardinality spread (> %.2f).\n", lastSize - newSize, minCardinalityConfidence);
lastSize = newSize;
this.partialExecutions.removeIf(partialExecution -> !this.checkNonEmptyCardinalities(partialExecution));
newSize = this.partialExecutions.size();
System.out.printf("Removed %d executions with zero cardinalities.\n", lastSize - newSize);
lastSize = newSize;
this.partialExecutions.removeIf(partialExecution -> !this.checkConfidence(partialExecution, minCardinalityConfidence));
newSize = this.partialExecutions.size();
System.out.printf("Removed %d executions with a too low cardinality confidence (< %.2f).\n", lastSize - newSize, minCardinalityConfidence);
lastSize = newSize;
this.partialExecutions.removeIf(partialExecution -> partialExecution.getMeasuredExecutionTime() < minExecutionTime);
newSize = this.partialExecutions.size();
System.out.printf("Removed %d executions with a too short runtime (< %,d ms).\n", lastSize - newSize, minExecutionTime);
lastSize = newSize;
this.partialExecutions.removeIf(partialExecution -> new Random().nextDouble() > samplingFactor);
newSize = this.partialExecutions.size();
System.out.printf("Removed %d executions due to sampling.\n", lastSize - newSize);
} catch (Exception e) {
throw new WayangException("Could not evaluate execution log.", e);
}
// Group the PartialExecutions.
this.partialExecutionGroups = this.groupPartialExecutions(this.partialExecutions).entrySet().stream()
.sorted(Comparator.comparingInt(e -> e.getKey().size()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
// Apply binning if requested.
double binningStretch = this.configuration.getDoubleProperty("wayang.profiler.ga.binning", 1.1d);
if (binningStretch > 1d) {
System.out.print("Applying binning... ");
int numOriginalPartialExecutions = this.partialExecutions.size();
this.partialExecutions.clear();
for (List<PartialExecution> group : this.partialExecutionGroups) {
final Collection<PartialExecution> reducedGroup = this.binByExecutionTime(group, binningStretch);
group.retainAll(reducedGroup);
this.partialExecutions.addAll(reducedGroup);
}
System.out.printf(
"reduced the number of partial executions from %d to %d.\n",
numOriginalPartialExecutions, this.partialExecutions.size()
);
}
// Initialize the optimization space with its LoadProfileEstimators and associated Variables.
this.optimizationSpace = new OptimizationSpace();
this.estimators = new HashMap<>();
this.platformOverheads = new HashMap<>();
for (PartialExecution partialExecution : this.partialExecutions) {
// Instrument the partial executions.
for (AtomicExecutionGroup executionGroup : partialExecution.getAtomicExecutionGroups()) {
for (AtomicExecution atomicExecution : executionGroup.getAtomicExecutions()) {
this.instrument(atomicExecution);
}
}
for (Platform platform : partialExecution.getInitializedPlatforms()) {
this.platformOverheads.computeIfAbsent(
platform,
key -> this.optimizationSpace.getOrCreateVariable(key.getClass().getCanonicalName() + "->overhead")
);
}
}
System.out.printf(
"Loaded %d execution records with %d template-based estimators types and %d platform overheads.\n",
this.partialExecutions.size(), estimators.keySet().size(), this.platformOverheads.size()
);
}
/**
* Check if all {@link CardinalityEstimate}s for the {@link PartialExecution} are sufficiently confident.
*
* @param partialExecution whose {@link CardinalityEstimate}s should be checked
* @param minCardinalityConfidence the minimum confidence
* @return whether the {@link CardinalityEstimate}s are sufficiently confident
*/
private boolean checkConfidence(PartialExecution partialExecution, double minCardinalityConfidence) {
return partialExecution.getAtomicExecutionGroups().stream().allMatch(
executionGroup -> {
final EstimationContext estimationContext = executionGroup.getEstimationContext();
for (CardinalityEstimate cardinality : estimationContext.getInputCardinalities()) {
if (cardinality == null) continue;
if (cardinality.getCorrectnessProbability() < minCardinalityConfidence) return false;
}
for (CardinalityEstimate cardinality : estimationContext.getOutputCardinalities()) {
if (cardinality == null) continue;
if (cardinality.getCorrectnessProbability() < minCardinalityConfidence) return false;
}
return true;
}
);
}
/**
* Check if all {@link CardinalityEstimate}s for the {@link PartialExecution} are greater than zero.
*
* @param partialExecution whose {@link CardinalityEstimate}s should be checked
* @return whether the {@link CardinalityEstimate}s are not equal to zero
*/
private boolean checkNonEmptyCardinalities(PartialExecution partialExecution) {
return partialExecution.getAtomicExecutionGroups().stream().allMatch(
executionGroup -> {
final EstimationContext estimationContext = executionGroup.getEstimationContext();
for (CardinalityEstimate cardinality : estimationContext.getInputCardinalities()) {
if (cardinality == null) continue;
if (cardinality.getUpperEstimate() == 0) {
return false;
}
}
for (CardinalityEstimate cardinality : estimationContext.getOutputCardinalities()) {
if (cardinality == null) continue;
if (cardinality.getUpperEstimate() == 0) {
return false;
}
}
return true;
}
);
}
/**
* Check if this {@link PartialExecution} contains any template-based {@link LoadProfileEstimator}s.
*
* @param partialExecution that should be checked
* @return whether the {@code partialExecution} contains at least one template
*/
private boolean checkEstimatorTemplates(PartialExecution partialExecution) {
return !this.getLoadProfileEstimatorTemplateKeys(partialExecution).isEmpty();
}
/**
* Check if all {@link CardinalityEstimate}s for the {@link PartialExecution} are sufficiently narrow.
*
* @param partialExecution whose {@link CardinalityEstimate}s should be checked
* @param maxCardinalitySpread the maximum spread of the {@link CardinalityEstimate}s
* @return whether the {@link CardinalityEstimate}s are sufficiently narrow
*/
private boolean checkSpread(PartialExecution partialExecution, double maxCardinalitySpread) {
return partialExecution.getAtomicExecutionGroups().stream().allMatch(
executionGroup -> {
final EstimationContext estimationContext = executionGroup.getEstimationContext();
for (CardinalityEstimate cardinality : estimationContext.getInputCardinalities()) {
if (cardinality == null) continue;
if (cardinality.getLowerEstimate() * maxCardinalitySpread < cardinality.getUpperEstimate()) {
return false;
}
}
for (CardinalityEstimate cardinality : estimationContext.getOutputCardinalities()) {
if (cardinality == null) continue;
if (cardinality.getLowerEstimate() * maxCardinalitySpread < cardinality.getUpperEstimate()) {
return false;
}
}
return true;
}
);
}
/**
* Wrap the {@link LoadProfileEstimator}s of the given {@link AtomicExecution} with {@link DynamicLoadEstimator}s
* where possible.
*
* @param atomicExecution that should be instrumented
*/
private void instrument(AtomicExecution atomicExecution) {
final DynamicLoadProfileEstimator dynamicLoadProfileEstimator = DynamicLoadProfileEstimators.createEstimatorFor(
atomicExecution.getLoadProfileEstimator(),
this.configuration,
this.optimizationSpace
);
atomicExecution.setLoadProfileEstimator(dynamicLoadProfileEstimator);
// Keep track of the estimators.
Queue<LoadProfileEstimator> instrumentedEstimators = new LinkedList<>();
instrumentedEstimators.add(dynamicLoadProfileEstimator);
while (!instrumentedEstimators.isEmpty()) {
final LoadProfileEstimator estimator = instrumentedEstimators.poll();
if (estimator instanceof DynamicLoadProfileEstimator && estimator.getConfigurationKey() != null) {
this.estimators.put(estimator.getConfigurationKey(), (DynamicLoadProfileEstimator) estimator);
}
instrumentedEstimators.addAll(estimator.getNestedEstimators());
}
}
public void run() {
if (this.optimizationSpace.getNumDimensions() == 0) {
System.out.println("There is nothing to optimize - all estimators are specified in the configuration.");
System.exit(0);
}
// Initialize form the configuration.
long timeLimit = this.configuration.getLongProperty("wayang.profiler.ga.timelimit.ms", -1);
long stopMillis = timeLimit > 0 ? System.currentTimeMillis() + timeLimit : -1L;
int maxGen = (int) this.configuration.getLongProperty("wayang.profiler.ga.maxgenerations", 5000);
int maxStableGen = (int) this.configuration.getLongProperty("wayang.profiler.ga.maxstablegenerations", 2000);
double minFitness = this.configuration.getDoubleProperty("wayang.profiler.ga.minfitness", .0d);
int superOptimizations = (int) this.configuration.getLongProperty("wayang.profiler.ga.superoptimizations", 3);
boolean isBlocking = this.configuration.getBooleanProperty("wayang.profiler.ga.blocking", false);
long maxPartialExecutionRemovals = this.configuration.getLongProperty("wayang.profiler.ga.noise-filter.max", 3);
double partialExecutionRemovalThreshold = this.configuration.getDoubleProperty("wayang.profiler.ga.noise-filter.threshold", 2);
// Create the root optimizer and an initial population.
GeneticOptimizer generalOptimizer = this.createOptimizer(this.partialExecutions);
List<Individual> population = generalOptimizer.createInitialPopulation();
int generation = 0;
// Optimize on blocks.
if (isBlocking) {
for (List<PartialExecution> group : this.partialExecutionGroups) {
final PartialExecution representative = WayangCollections.getAny(group);
final Set<String> templateKeys = this.getLoadProfileEstimatorTemplateKeys(representative);
if (group.size() < 2) {
System.out.printf("Few measurement points for %s\n", templateKeys);
}
if (representative.getAtomicExecutionGroups().size() > 3) {
System.out.printf("Many subjects for %s\n", templateKeys);
}
long minExecTime = group.stream().mapToLong(PartialExecution::getMeasuredExecutionTime).min().getAsLong();
long maxExecTime = group.stream().mapToLong(PartialExecution::getMeasuredExecutionTime).max().getAsLong();
if (maxExecTime - minExecTime < 1000) {
System.out.printf("Narrow training data for %s\n", templateKeys);
continue;
}
final Tuple<Integer, List<Individual>> newGeneration = this.superOptimize(
superOptimizations, population, group, generation, maxGen, maxStableGen, minFitness, stopMillis
);
generation = newGeneration.getField0();
population = newGeneration.getField1();
final GeneticOptimizer tempOptimizer = this.createOptimizer(group);
this.printResults(tempOptimizer, population.get(0));
}
}
while (true) {
// Optimize on the complete training data.
final Tuple<Integer, List<Individual>> newGeneration = this.optimize(
population, generalOptimizer, generation, maxGen, maxStableGen, minFitness, stopMillis
);
generation = newGeneration.getField0();
population = newGeneration.getField1();
Individual fittestIndividual = population.get(0);
printResults(generalOptimizer, fittestIndividual);
if (maxPartialExecutionRemovals > 0) {
// Gather the PartialExecutions that are not well explained by the learned model.
List<Tuple<PartialExecution, Double>> partialExecutionDeviations = new ArrayList<>();
for (PartialExecution partialExecution : partialExecutions) {
final double timeEstimate = fittestIndividual.estimateTime(
partialExecution, this.platformOverheads, this.configuration
);
double deviation = (Math.max(timeEstimate, partialExecution.getMeasuredExecutionTime()) + 500) /
(Math.min(timeEstimate, partialExecution.getMeasuredExecutionTime()) + 500);
if (deviation > partialExecutionRemovalThreshold) {
partialExecutionDeviations.add(new Tuple<>(partialExecution, deviation));
}
}
// Check if we actually have a good model.
if (partialExecutionDeviations.isEmpty()) {
System.out.printf("All %d executions are explained well by the current model.\n", this.partialExecutions.size());
break;
}
// Check if we ran out of time.
if (stopMillis > 0 && System.currentTimeMillis() >= stopMillis) break;
// Remove the worst PartialExecutions.
System.out.printf("The current model is not explaining well %d of %d measured executions.\n",
partialExecutionDeviations.size(),
this.partialExecutions.size()
);
partialExecutionDeviations.sort((ped1, ped2) -> ped2.getField1().compareTo(ped1.getField1()));
long numRemovables = maxPartialExecutionRemovals;
for (Tuple<PartialExecution, Double> partialExecutionDeviation : partialExecutionDeviations) {
if (numRemovables-- <= 0) break;
final PartialExecution partialExecution = partialExecutionDeviation.getField0();
final double deviation = partialExecutionDeviation.getField1();
final double timeEstimate = fittestIndividual.estimateTime(
partialExecution, this.platformOverheads, this.configuration
);
System.out.printf("Removing %s... (estimated %s, deviation %,.2f)\n",
format(partialExecution), Formats.formatDuration(Math.round(timeEstimate)), deviation
);
this.partialExecutions.remove(partialExecution);
}
} else {
break;
}
}
String outputFile = this.configuration.getStringProperty("wayang.profiler.ga.output-file", null);
if (outputFile != null) {
Individual fittestIndividual = population.get(0);
try (PrintStream printStream = new PrintStream(new FileOutputStream(outputFile))) {
this.printLearnedConfiguration(generalOptimizer, fittestIndividual, printStream);
} catch (FileNotFoundException e) {
logger.error("Could not save learned configuration to output file.", e);
}
}
}
private void printResults(GeneticOptimizer optimizer, Individual individual) {
// Print the training data vs. the estimates.
System.out.println();
System.out.printf("=== Stats for fittest individual (fitness=%,.4f)\n", individual.getFitness());
System.out.println();
System.out.println("Training data vs. measured");
System.out.println("==========================");
List<PartialExecution> data = new ArrayList<>(optimizer.getData());
data.sort((e1, e2) -> Long.compare(e2.getMeasuredExecutionTime(), e1.getMeasuredExecutionTime()));
for (PartialExecution partialExecution : data) {
final double timeEstimate = individual.estimateTime(partialExecution, this.platformOverheads, this.configuration);
System.out.printf("Actual %13s | Estimated: %72s | %3d execution groups | %s\n",
Formats.formatDuration(partialExecution.getMeasuredExecutionTime()),
Formats.formatDuration(Math.round(timeEstimate)),
partialExecution.getAtomicExecutionGroups().size(),
Stream.concat(
partialExecution.getAtomicExecutionGroups().stream().map(AtomicExecutionGroup::toString),
partialExecution.getInitializedPlatforms().stream().map(Platform::getName)
).collect(Collectors.toList())
);
}
System.out.println();
System.out.println("Configuration file");
System.out.println("==================");
this.printLearnedConfiguration(optimizer, individual, System.out);
}
private void printLearnedConfiguration(GeneticOptimizer optimizer, Individual individual, PrintStream out) {
final Bitmask genes = optimizer.getActivatedGenes();
Set<Variable> optimizedVariables = new HashSet<>(genes.cardinality());
for (int gene = genes.nextSetBit(0); gene != -1; gene = genes.nextSetBit(gene + 1)) {
optimizedVariables.add(this.optimizationSpace.getVariable(gene));
}
for (Map.Entry<Platform, Variable> entry : this.platformOverheads.entrySet()) {
final Platform platform = entry.getKey();
final Variable overhead = entry.getValue();
if (!optimizedVariables.contains(overhead)) continue;
out.printf("wayang.%s.init.ms = %d\n",
platform.getConfigurationName(),
Math.round(overhead.getValue(individual))
);
}
for (LoadProfileEstimator estimator : estimators.values()) {
if (estimator instanceof DynamicLoadProfileEstimator) {
final DynamicLoadProfileEstimator dynamicLoadProfileEstimator = (DynamicLoadProfileEstimator) estimator;
if (!optimizedVariables.containsAll(dynamicLoadProfileEstimator.getEmployedVariables())) continue;
out.println(dynamicLoadProfileEstimator.toJsonConfig(individual));
}
}
}
/**
* Creates a new {@link GeneticOptimizer} that used the given {@link PartialExecution}s as training data.
*
* @param partialExecutions the training data
* @return the {@link GeneticOptimizer}
*/
private GeneticOptimizer createOptimizer(Collection<PartialExecution> partialExecutions) {
return new GeneticOptimizer(
this.optimizationSpace,
partialExecutions,
this.estimators,
this.platformOverheads,
this.configuration
);
}
private Tuple<Integer, List<Individual>> superOptimize(
int numTribes,
List<Individual> individuals,
Collection<PartialExecution> partialExecutions,
int currentGeneration,
int maxGenerations,
int maxStableGenerations,
double minFitness,
long stopMillis) {
int individualsPerTribe = (individuals.size() + numTribes - 1) / numTribes;
List<Individual> superpopulation = new ArrayList<>(individuals.size() * numTribes);
int maxGeneration = 0;
for (int i = 0; i < numTribes; i++) {
final Tuple<Integer, List<Individual>> population = this.optimize(
individuals, partialExecutions, currentGeneration, maxGenerations, maxStableGenerations, minFitness, stopMillis
);
maxGeneration = Math.max(maxGeneration, population.getField0());
superpopulation.addAll(population.getField1().subList(0, individualsPerTribe));
}
superpopulation.sort(Individual.fitnessComparator);
return new Tuple<>(maxGeneration, superpopulation.subList(0, individuals.size()));
}
private Tuple<Integer, List<Individual>> optimize(
List<Individual> individuals,
Collection<PartialExecution> partialExecutions,
int currentGeneration,
int maxGenerations,
int maxStableGenerations,
double minFitness,
long stopMillis) {
GeneticOptimizer optimizer = this.createOptimizer(partialExecutions);
return this.optimize(individuals, optimizer, currentGeneration, maxGenerations, maxStableGenerations, minFitness, stopMillis);
}
private Tuple<Integer, List<Individual>> optimize(
List<Individual> individuals,
GeneticOptimizer optimizer,
int currentGeneration,
int maxGenerations,
int maxStableGenerations,
double minFitness,
long stopMillis) {
if (optimizer.getActivatedGenes().isEmpty()) {
System.out.println("There is an optimization task without optimizable genes. It will be skipped");
return new Tuple<>(currentGeneration, individuals);
}
int updateFrequency = (int) this.configuration.getLongProperty("wayang.profiler.ga.intermediateupdate", 10000);
System.out.printf("Optimizing %d variables on %d partial executions (e.g., %s).\n",
optimizer.getActivatedGenes().cardinality(),
optimizer.getData().size(),
WayangCollections.getAny(optimizer.getData()).getAtomicExecutionGroups()
);
optimizer.updateFitness(individuals);
double checkpointedFitness = Double.NEGATIVE_INFINITY;
int i;
for (i = 0; i < maxGenerations; i++, currentGeneration++) {
// Print status.
if (i % maxStableGenerations == 0) {
System.out.printf(
"Fittest individual of generation %,d (%,d): %,.4f\n",
i,
currentGeneration,
individuals.get(0).getFitness()
);
}
individuals = optimizer.evolve(individuals);
if (updateFrequency > 0 && i > 0 && i % updateFrequency == 0) {
System.out.println("Intermediate update:");
this.printResults(optimizer, individuals.get(0));
}
// Check if the time limit has passed.
if (stopMillis > 0 && stopMillis <= System.currentTimeMillis()) break;
// Check whether we seem to be stuck in a (local) optimum.
if (i % maxStableGenerations == 0) {
final double currentFitness = individuals.get(0).getFitness();
if (!(currentFitness >= checkpointedFitness + 0.001) && currentFitness >= minFitness && i > 0) {
break;
} else {
checkpointedFitness = currentFitness;
}
}
}
System.out.printf(
"Final fittest individual of generation %,d (%,d): %,.4f\n",
i,
currentGeneration,
individuals.get(0).getFitness()
);
return new Tuple<>(currentGeneration, individuals);
}
/**
* Group {@link PartialExecution}s by their comprised {@link LoadProfileEstimator}s template.
*
* @param partialExecutions the {@link PartialExecution}s
* @return the grouping of the {@link #partialExecutions}
*/
private Map<Set<String>, List<PartialExecution>> groupPartialExecutions(Collection<PartialExecution> partialExecutions) {
Map<Set<String>, List<PartialExecution>> groups = new HashMap<>();
for (PartialExecution partialExecution : partialExecutions) {
// Determine the ExecutionOperator classes in the partialExecution.
final Set<String> templateKeys = this.getLoadProfileEstimatorTemplateKeys(partialExecution);
// Index the partialExecution.
groups.computeIfAbsent(templateKeys, key -> new LinkedList<>()).add(partialExecution);
}
return groups;
}
/**
* Extract the {@link LoadProfileEstimator} template keys in the given {@link PartialExecution}.
*
* @param partialExecution the {@link PartialExecution}
* @return the {@link ExecutionOperator} {@link Class}es
*/
private Set<String> getLoadProfileEstimatorTemplateKeys(PartialExecution partialExecution) {
return partialExecution.getAtomicExecutionGroups().stream()
.flatMap(group -> group.getAtomicExecutions().stream())
.flatMap(execution -> execution.getLoadProfileEstimator().getTemplateKeys().stream())
.collect(Collectors.toSet());
}
/**
* Bin given {@link PartialExecution}s by their execution time and retain one representative per bin.
*
* @param partialExecutions the {@link PartialExecution}s
* @param densityFactor the stretch of each bin
* @return the binned {@link PartialExecution}s
*/
private Collection<PartialExecution> binByExecutionTime(Collection<PartialExecution> partialExecutions, double densityFactor) {
Map<Integer, PartialExecution> resultBins = new HashMap<>();
for (PartialExecution partialExecution : partialExecutions) {
int key = (int) Math.round(Math.log1p(partialExecution.getMeasuredExecutionTime()) / Math.log(densityFactor));
resultBins.put(key, partialExecution);
}
return resultBins.values();
}
private static String format(PartialExecution partialExecution) {
return String.format("[%d atomic execution groups in %s: %s, %s]",
partialExecution.getAtomicExecutionGroups().size(),
Formats.formatDuration(partialExecution.getMeasuredExecutionTime()),
partialExecution.getAtomicExecutionGroups(),
partialExecution.getInitializedPlatforms()
);
}
public static void main(String[] args) {
Configuration configuration = args.length == 0 ? new Configuration() : new Configuration(args[0]);
if (args.length >= 2) {
configuration.setProperty("wayang.core.log.executions", args[1]);
}
new GeneticOptimizerApp(configuration).run();
}
}