| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.profiler.log; |
| |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.PrintStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.api.exception.WayangException; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; |
| import org.apache.wayang.core.optimizer.costs.EstimationContext; |
| import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.platform.AtomicExecution; |
| import org.apache.wayang.core.platform.AtomicExecutionGroup; |
| import org.apache.wayang.core.platform.PartialExecution; |
| import org.apache.wayang.core.platform.Platform; |
| import org.apache.wayang.core.profiling.ExecutionLog; |
| import org.apache.wayang.core.util.Bitmask; |
| import org.apache.wayang.core.util.Formats; |
| import org.apache.wayang.core.util.Tuple; |
| import org.apache.wayang.core.util.WayangCollections; |
| import org.apache.wayang.java.Java; |
| import org.apache.wayang.postgres.Postgres; |
| import org.apache.wayang.spark.Spark; |
| import org.apache.wayang.sqlite3.Sqlite3; |
| |
| /** |
| * This app tries to infer good {@link LoadProfileEstimator}s for {@link ExecutionOperator}s using data from an |
| * {@link ExecutionLog}. |
| */ |
| public class GeneticOptimizerApp { |
| |
| private static final Logger logger = LogManager.getLogger(GeneticOptimizerApp.class); |
| |
| /** |
| * {@link Configuration} to be used. |
| */ |
| final Configuration configuration; |
| |
| /** |
| * Maintains {@link Variable}s to be optimized. |
| */ |
| OptimizationSpace optimizationSpace; |
| |
| /** |
| * Maintains {@link PartialExecution}s as training data. |
| */ |
| List<PartialExecution> partialExecutions; |
| |
| /** |
| * The {@link #partialExecutions} grouped by their containing {@link ExecutionOperator}s. |
| */ |
| private final List<List<PartialExecution>> partialExecutionGroups; |
| |
| /** |
| * Maintains a {@link DynamicLoadProfileEstimator} for every {@link Configuration} key in the |
| * {@link #partialExecutions}. |
| */ |
| Map<String, DynamicLoadProfileEstimator> estimators; |
| |
| /** |
| * Maintains variables that quantify the overhead for initializing a {@link Platform}. |
| */ |
| Map<Platform, Variable> platformOverheads = new HashMap<>(); |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param configuration provides, amongst others, platform specifications |
| */ |
| public GeneticOptimizerApp(Configuration configuration) { |
| this.configuration = configuration; |
| |
| // Initialize platforms. |
| Java.platform(); |
| Spark.platform(); |
| Sqlite3.platform(); |
| Postgres.platform(); |
| //TODO: add dinamically Graphchi, if the version of scala is 2.11 |
| |
| // Load the ExecutionLog. |
| double samplingFactor = this.configuration.getDoubleProperty("wayang.profiler.ga.sampling", 1d); |
| double maxCardinalitySpread = this.configuration.getDoubleProperty("wayang.profiler.ga.max-cardinality-spread", 1d); |
| double minCardinalityConfidence = this.configuration.getDoubleProperty("wayang.profiler.ga.min-cardinality-confidence", 1d); |
| long minExecutionTime = this.configuration.getLongProperty("wayang.profiler.ga.min-exec-time", 1); |
| try (ExecutionLog executionLog = ExecutionLog.open(configuration)) { |
| this.partialExecutions = executionLog.stream().collect(Collectors.toList()); |
| |
| int lastSize = this.partialExecutions.size(); |
| this.partialExecutions.removeIf(partialExecution -> !this.checkEstimatorTemplates(partialExecution)); |
| int newSize = this.partialExecutions.size(); |
| System.out.printf("Removed %d executions with no template-based estimators.\n", lastSize - newSize); |
| lastSize = newSize; |
| |
| this.partialExecutions.removeIf(partialExecution -> !this.checkSpread(partialExecution, maxCardinalitySpread)); |
| newSize = this.partialExecutions.size(); |
| System.out.printf("Removed %d executions with a too large cardinality spread (> %.2f).\n", lastSize - newSize, minCardinalityConfidence); |
| lastSize = newSize; |
| |
| this.partialExecutions.removeIf(partialExecution -> !this.checkNonEmptyCardinalities(partialExecution)); |
| newSize = this.partialExecutions.size(); |
| System.out.printf("Removed %d executions with zero cardinalities.\n", lastSize - newSize); |
| lastSize = newSize; |
| |
| this.partialExecutions.removeIf(partialExecution -> !this.checkConfidence(partialExecution, minCardinalityConfidence)); |
| newSize = this.partialExecutions.size(); |
| System.out.printf("Removed %d executions with a too low cardinality confidence (< %.2f).\n", lastSize - newSize, minCardinalityConfidence); |
| lastSize = newSize; |
| |
| this.partialExecutions.removeIf(partialExecution -> partialExecution.getMeasuredExecutionTime() < minExecutionTime); |
| newSize = this.partialExecutions.size(); |
| System.out.printf("Removed %d executions with a too short runtime (< %,d ms).\n", lastSize - newSize, minExecutionTime); |
| lastSize = newSize; |
| |
| this.partialExecutions.removeIf(partialExecution -> new Random().nextDouble() > samplingFactor); |
| newSize = this.partialExecutions.size(); |
| System.out.printf("Removed %d executions due to sampling.\n", lastSize - newSize); |
| } catch (Exception e) { |
| throw new WayangException("Could not evaluate execution log.", e); |
| } |
| |
| // Group the PartialExecutions. |
| this.partialExecutionGroups = this.groupPartialExecutions(this.partialExecutions).entrySet().stream() |
| .sorted(Comparator.comparingInt(e -> e.getKey().size())) |
| .map(Map.Entry::getValue) |
| .collect(Collectors.toList()); |
| |
| // Apply binning if requested. |
| double binningStretch = this.configuration.getDoubleProperty("wayang.profiler.ga.binning", 1.1d); |
| if (binningStretch > 1d) { |
| System.out.print("Applying binning... "); |
| int numOriginalPartialExecutions = this.partialExecutions.size(); |
| this.partialExecutions.clear(); |
| for (List<PartialExecution> group : this.partialExecutionGroups) { |
| final Collection<PartialExecution> reducedGroup = this.binByExecutionTime(group, binningStretch); |
| group.retainAll(reducedGroup); |
| this.partialExecutions.addAll(reducedGroup); |
| } |
| System.out.printf( |
| "reduced the number of partial executions from %d to %d.\n", |
| numOriginalPartialExecutions, this.partialExecutions.size() |
| ); |
| } |
| |
| // Initialize the optimization space with its LoadProfileEstimators and associated Variables. |
| this.optimizationSpace = new OptimizationSpace(); |
| this.estimators = new HashMap<>(); |
| this.platformOverheads = new HashMap<>(); |
| |
| for (PartialExecution partialExecution : this.partialExecutions) { |
| // Instrument the partial executions. |
| for (AtomicExecutionGroup executionGroup : partialExecution.getAtomicExecutionGroups()) { |
| for (AtomicExecution atomicExecution : executionGroup.getAtomicExecutions()) { |
| this.instrument(atomicExecution); |
| } |
| } |
| |
| for (Platform platform : partialExecution.getInitializedPlatforms()) { |
| this.platformOverheads.computeIfAbsent( |
| platform, |
| key -> this.optimizationSpace.getOrCreateVariable(key.getClass().getCanonicalName() + "->overhead") |
| ); |
| } |
| } |
| |
| System.out.printf( |
| "Loaded %d execution records with %d template-based estimators types and %d platform overheads.\n", |
| this.partialExecutions.size(), estimators.keySet().size(), this.platformOverheads.size() |
| ); |
| } |
| |
| /** |
| * Check if all {@link CardinalityEstimate}s for the {@link PartialExecution} are sufficiently confident. |
| * |
| * @param partialExecution whose {@link CardinalityEstimate}s should be checked |
| * @param minCardinalityConfidence the minimum confidence |
| * @return whether the {@link CardinalityEstimate}s are sufficiently confident |
| */ |
| private boolean checkConfidence(PartialExecution partialExecution, double minCardinalityConfidence) { |
| return partialExecution.getAtomicExecutionGroups().stream().allMatch( |
| executionGroup -> { |
| final EstimationContext estimationContext = executionGroup.getEstimationContext(); |
| for (CardinalityEstimate cardinality : estimationContext.getInputCardinalities()) { |
| if (cardinality == null) continue; |
| if (cardinality.getCorrectnessProbability() < minCardinalityConfidence) return false; |
| } |
| for (CardinalityEstimate cardinality : estimationContext.getOutputCardinalities()) { |
| if (cardinality == null) continue; |
| if (cardinality.getCorrectnessProbability() < minCardinalityConfidence) return false; |
| } |
| return true; |
| } |
| ); |
| } |
| |
| /** |
| * Check if all {@link CardinalityEstimate}s for the {@link PartialExecution} are greater than zero. |
| * |
| * @param partialExecution whose {@link CardinalityEstimate}s should be checked |
| * @return whether the {@link CardinalityEstimate}s are not equal to zero |
| */ |
| private boolean checkNonEmptyCardinalities(PartialExecution partialExecution) { |
| return partialExecution.getAtomicExecutionGroups().stream().allMatch( |
| executionGroup -> { |
| final EstimationContext estimationContext = executionGroup.getEstimationContext(); |
| for (CardinalityEstimate cardinality : estimationContext.getInputCardinalities()) { |
| if (cardinality == null) continue; |
| if (cardinality.getUpperEstimate() == 0) { |
| return false; |
| } |
| } |
| for (CardinalityEstimate cardinality : estimationContext.getOutputCardinalities()) { |
| if (cardinality == null) continue; |
| if (cardinality.getUpperEstimate() == 0) { |
| return false; |
| } |
| } |
| return true; |
| } |
| ); |
| } |
| |
| /** |
| * Check if this {@link PartialExecution} contains any template-based {@link LoadProfileEstimator}s. |
| * |
| * @param partialExecution that should be checked |
| * @return whether the {@code partialExecution} contains at least one template |
| */ |
| private boolean checkEstimatorTemplates(PartialExecution partialExecution) { |
| return !this.getLoadProfileEstimatorTemplateKeys(partialExecution).isEmpty(); |
| } |
| |
| /** |
| * Check if all {@link CardinalityEstimate}s for the {@link PartialExecution} are sufficiently narrow. |
| * |
| * @param partialExecution whose {@link CardinalityEstimate}s should be checked |
| * @param maxCardinalitySpread the maximum spread of the {@link CardinalityEstimate}s |
| * @return whether the {@link CardinalityEstimate}s are sufficiently narrow |
| */ |
| private boolean checkSpread(PartialExecution partialExecution, double maxCardinalitySpread) { |
| return partialExecution.getAtomicExecutionGroups().stream().allMatch( |
| executionGroup -> { |
| final EstimationContext estimationContext = executionGroup.getEstimationContext(); |
| for (CardinalityEstimate cardinality : estimationContext.getInputCardinalities()) { |
| if (cardinality == null) continue; |
| if (cardinality.getLowerEstimate() * maxCardinalitySpread < cardinality.getUpperEstimate()) { |
| return false; |
| } |
| } |
| for (CardinalityEstimate cardinality : estimationContext.getOutputCardinalities()) { |
| if (cardinality == null) continue; |
| if (cardinality.getLowerEstimate() * maxCardinalitySpread < cardinality.getUpperEstimate()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| ); |
| } |
| |
| /** |
| * Wrap the {@link LoadProfileEstimator}s of the given {@link AtomicExecution} with {@link DynamicLoadEstimator}s |
| * where possible. |
| * |
| * @param atomicExecution that should be instrumented |
| */ |
| private void instrument(AtomicExecution atomicExecution) { |
| final DynamicLoadProfileEstimator dynamicLoadProfileEstimator = DynamicLoadProfileEstimators.createEstimatorFor( |
| atomicExecution.getLoadProfileEstimator(), |
| this.configuration, |
| this.optimizationSpace |
| ); |
| atomicExecution.setLoadProfileEstimator(dynamicLoadProfileEstimator); |
| |
| // Keep track of the estimators. |
| Queue<LoadProfileEstimator> instrumentedEstimators = new LinkedList<>(); |
| instrumentedEstimators.add(dynamicLoadProfileEstimator); |
| while (!instrumentedEstimators.isEmpty()) { |
| final LoadProfileEstimator estimator = instrumentedEstimators.poll(); |
| if (estimator instanceof DynamicLoadProfileEstimator && estimator.getConfigurationKey() != null) { |
| this.estimators.put(estimator.getConfigurationKey(), (DynamicLoadProfileEstimator) estimator); |
| } |
| instrumentedEstimators.addAll(estimator.getNestedEstimators()); |
| } |
| } |
| |
| |
| public void run() { |
| if (this.optimizationSpace.getNumDimensions() == 0) { |
| System.out.println("There is nothing to optimize - all estimators are specified in the configuration."); |
| System.exit(0); |
| } |
| |
| // Initialize form the configuration. |
| long timeLimit = this.configuration.getLongProperty("wayang.profiler.ga.timelimit.ms", -1); |
| long stopMillis = timeLimit > 0 ? System.currentTimeMillis() + timeLimit : -1L; |
| int maxGen = (int) this.configuration.getLongProperty("wayang.profiler.ga.maxgenerations", 5000); |
| int maxStableGen = (int) this.configuration.getLongProperty("wayang.profiler.ga.maxstablegenerations", 2000); |
| double minFitness = this.configuration.getDoubleProperty("wayang.profiler.ga.minfitness", .0d); |
| int superOptimizations = (int) this.configuration.getLongProperty("wayang.profiler.ga.superoptimizations", 3); |
| boolean isBlocking = this.configuration.getBooleanProperty("wayang.profiler.ga.blocking", false); |
| long maxPartialExecutionRemovals = this.configuration.getLongProperty("wayang.profiler.ga.noise-filter.max", 3); |
| double partialExecutionRemovalThreshold = this.configuration.getDoubleProperty("wayang.profiler.ga.noise-filter.threshold", 2); |
| |
| // Create the root optimizer and an initial population. |
| GeneticOptimizer generalOptimizer = this.createOptimizer(this.partialExecutions); |
| List<Individual> population = generalOptimizer.createInitialPopulation(); |
| int generation = 0; |
| |
| // Optimize on blocks. |
| if (isBlocking) { |
| for (List<PartialExecution> group : this.partialExecutionGroups) { |
| final PartialExecution representative = WayangCollections.getAny(group); |
| final Set<String> templateKeys = this.getLoadProfileEstimatorTemplateKeys(representative); |
| if (group.size() < 2) { |
| System.out.printf("Few measurement points for %s\n", templateKeys); |
| } |
| if (representative.getAtomicExecutionGroups().size() > 3) { |
| System.out.printf("Many subjects for %s\n", templateKeys); |
| } |
| |
| long minExecTime = group.stream().mapToLong(PartialExecution::getMeasuredExecutionTime).min().getAsLong(); |
| long maxExecTime = group.stream().mapToLong(PartialExecution::getMeasuredExecutionTime).max().getAsLong(); |
| if (maxExecTime - minExecTime < 1000) { |
| System.out.printf("Narrow training data for %s\n", templateKeys); |
| continue; |
| } |
| |
| final Tuple<Integer, List<Individual>> newGeneration = this.superOptimize( |
| superOptimizations, population, group, generation, maxGen, maxStableGen, minFitness, stopMillis |
| ); |
| generation = newGeneration.getField0(); |
| population = newGeneration.getField1(); |
| |
| final GeneticOptimizer tempOptimizer = this.createOptimizer(group); |
| this.printResults(tempOptimizer, population.get(0)); |
| } |
| } |
| |
| while (true) { |
| // Optimize on the complete training data. |
| final Tuple<Integer, List<Individual>> newGeneration = this.optimize( |
| population, generalOptimizer, generation, maxGen, maxStableGen, minFitness, stopMillis |
| ); |
| generation = newGeneration.getField0(); |
| population = newGeneration.getField1(); |
| Individual fittestIndividual = population.get(0); |
| printResults(generalOptimizer, fittestIndividual); |
| |
| if (maxPartialExecutionRemovals > 0) { |
| // Gather the PartialExecutions that are not well explained by the learned model. |
| List<Tuple<PartialExecution, Double>> partialExecutionDeviations = new ArrayList<>(); |
| for (PartialExecution partialExecution : partialExecutions) { |
| final double timeEstimate = fittestIndividual.estimateTime( |
| partialExecution, this.platformOverheads, this.configuration |
| ); |
| double deviation = (Math.max(timeEstimate, partialExecution.getMeasuredExecutionTime()) + 500) / |
| (Math.min(timeEstimate, partialExecution.getMeasuredExecutionTime()) + 500); |
| if (deviation > partialExecutionRemovalThreshold) { |
| partialExecutionDeviations.add(new Tuple<>(partialExecution, deviation)); |
| } |
| } |
| |
| // Check if we actually have a good model. |
| if (partialExecutionDeviations.isEmpty()) { |
| System.out.printf("All %d executions are explained well by the current model.\n", this.partialExecutions.size()); |
| break; |
| } |
| |
| // Check if we ran out of time. |
| if (stopMillis > 0 && System.currentTimeMillis() >= stopMillis) break; |
| |
| // Remove the worst PartialExecutions. |
| System.out.printf("The current model is not explaining well %d of %d measured executions.\n", |
| partialExecutionDeviations.size(), |
| this.partialExecutions.size() |
| ); |
| partialExecutionDeviations.sort((ped1, ped2) -> ped2.getField1().compareTo(ped1.getField1())); |
| long numRemovables = maxPartialExecutionRemovals; |
| for (Tuple<PartialExecution, Double> partialExecutionDeviation : partialExecutionDeviations) { |
| if (numRemovables-- <= 0) break; |
| final PartialExecution partialExecution = partialExecutionDeviation.getField0(); |
| final double deviation = partialExecutionDeviation.getField1(); |
| final double timeEstimate = fittestIndividual.estimateTime( |
| partialExecution, this.platformOverheads, this.configuration |
| ); |
| System.out.printf("Removing %s... (estimated %s, deviation %,.2f)\n", |
| format(partialExecution), Formats.formatDuration(Math.round(timeEstimate)), deviation |
| ); |
| this.partialExecutions.remove(partialExecution); |
| } |
| } else { |
| break; |
| } |
| } |
| |
| String outputFile = this.configuration.getStringProperty("wayang.profiler.ga.output-file", null); |
| if (outputFile != null) { |
| Individual fittestIndividual = population.get(0); |
| try (PrintStream printStream = new PrintStream(new FileOutputStream(outputFile))) { |
| this.printLearnedConfiguration(generalOptimizer, fittestIndividual, printStream); |
| } catch (FileNotFoundException e) { |
| logger.error("Could not save learned configuration to output file.", e); |
| } |
| } |
| |
| } |
| |
| private void printResults(GeneticOptimizer optimizer, Individual individual) { |
| // Print the training data vs. the estimates. |
| System.out.println(); |
| System.out.printf("=== Stats for fittest individual (fitness=%,.4f)\n", individual.getFitness()); |
| System.out.println(); |
| System.out.println("Training data vs. measured"); |
| System.out.println("=========================="); |
| List<PartialExecution> data = new ArrayList<>(optimizer.getData()); |
| data.sort((e1, e2) -> Long.compare(e2.getMeasuredExecutionTime(), e1.getMeasuredExecutionTime())); |
| for (PartialExecution partialExecution : data) { |
| final double timeEstimate = individual.estimateTime(partialExecution, this.platformOverheads, this.configuration); |
| System.out.printf("Actual %13s | Estimated: %72s | %3d execution groups | %s\n", |
| Formats.formatDuration(partialExecution.getMeasuredExecutionTime()), |
| Formats.formatDuration(Math.round(timeEstimate)), |
| partialExecution.getAtomicExecutionGroups().size(), |
| Stream.concat( |
| partialExecution.getAtomicExecutionGroups().stream().map(AtomicExecutionGroup::toString), |
| partialExecution.getInitializedPlatforms().stream().map(Platform::getName) |
| ).collect(Collectors.toList()) |
| ); |
| } |
| |
| System.out.println(); |
| System.out.println("Configuration file"); |
| System.out.println("=================="); |
| this.printLearnedConfiguration(optimizer, individual, System.out); |
| } |
| |
| private void printLearnedConfiguration(GeneticOptimizer optimizer, Individual individual, PrintStream out) { |
| final Bitmask genes = optimizer.getActivatedGenes(); |
| Set<Variable> optimizedVariables = new HashSet<>(genes.cardinality()); |
| for (int gene = genes.nextSetBit(0); gene != -1; gene = genes.nextSetBit(gene + 1)) { |
| optimizedVariables.add(this.optimizationSpace.getVariable(gene)); |
| } |
| for (Map.Entry<Platform, Variable> entry : this.platformOverheads.entrySet()) { |
| final Platform platform = entry.getKey(); |
| final Variable overhead = entry.getValue(); |
| if (!optimizedVariables.contains(overhead)) continue; |
| |
| out.printf("wayang.%s.init.ms = %d\n", |
| platform.getConfigurationName(), |
| Math.round(overhead.getValue(individual)) |
| ); |
| } |
| for (LoadProfileEstimator estimator : estimators.values()) { |
| if (estimator instanceof DynamicLoadProfileEstimator) { |
| final DynamicLoadProfileEstimator dynamicLoadProfileEstimator = (DynamicLoadProfileEstimator) estimator; |
| if (!optimizedVariables.containsAll(dynamicLoadProfileEstimator.getEmployedVariables())) continue; |
| out.println(dynamicLoadProfileEstimator.toJsonConfig(individual)); |
| } |
| } |
| } |
| |
| /** |
| * Creates a new {@link GeneticOptimizer} that used the given {@link PartialExecution}s as training data. |
| * |
| * @param partialExecutions the training data |
| * @return the {@link GeneticOptimizer} |
| */ |
| private GeneticOptimizer createOptimizer(Collection<PartialExecution> partialExecutions) { |
| return new GeneticOptimizer( |
| this.optimizationSpace, |
| partialExecutions, |
| this.estimators, |
| this.platformOverheads, |
| this.configuration |
| ); |
| } |
| |
| private Tuple<Integer, List<Individual>> superOptimize( |
| int numTribes, |
| List<Individual> individuals, |
| Collection<PartialExecution> partialExecutions, |
| int currentGeneration, |
| int maxGenerations, |
| int maxStableGenerations, |
| double minFitness, |
| long stopMillis) { |
| |
| int individualsPerTribe = (individuals.size() + numTribes - 1) / numTribes; |
| List<Individual> superpopulation = new ArrayList<>(individuals.size() * numTribes); |
| int maxGeneration = 0; |
| for (int i = 0; i < numTribes; i++) { |
| final Tuple<Integer, List<Individual>> population = this.optimize( |
| individuals, partialExecutions, currentGeneration, maxGenerations, maxStableGenerations, minFitness, stopMillis |
| ); |
| maxGeneration = Math.max(maxGeneration, population.getField0()); |
| superpopulation.addAll(population.getField1().subList(0, individualsPerTribe)); |
| } |
| superpopulation.sort(Individual.fitnessComparator); |
| return new Tuple<>(maxGeneration, superpopulation.subList(0, individuals.size())); |
| } |
| |
| private Tuple<Integer, List<Individual>> optimize( |
| List<Individual> individuals, |
| Collection<PartialExecution> partialExecutions, |
| int currentGeneration, |
| int maxGenerations, |
| int maxStableGenerations, |
| double minFitness, |
| long stopMillis) { |
| GeneticOptimizer optimizer = this.createOptimizer(partialExecutions); |
| return this.optimize(individuals, optimizer, currentGeneration, maxGenerations, maxStableGenerations, minFitness, stopMillis); |
| } |
| |
| private Tuple<Integer, List<Individual>> optimize( |
| List<Individual> individuals, |
| GeneticOptimizer optimizer, |
| int currentGeneration, |
| int maxGenerations, |
| int maxStableGenerations, |
| double minFitness, |
| long stopMillis) { |
| |
| if (optimizer.getActivatedGenes().isEmpty()) { |
| System.out.println("There is an optimization task without optimizable genes. It will be skipped"); |
| return new Tuple<>(currentGeneration, individuals); |
| } |
| |
| int updateFrequency = (int) this.configuration.getLongProperty("wayang.profiler.ga.intermediateupdate", 10000); |
| System.out.printf("Optimizing %d variables on %d partial executions (e.g., %s).\n", |
| optimizer.getActivatedGenes().cardinality(), |
| optimizer.getData().size(), |
| WayangCollections.getAny(optimizer.getData()).getAtomicExecutionGroups() |
| ); |
| |
| optimizer.updateFitness(individuals); |
| double checkpointedFitness = Double.NEGATIVE_INFINITY; |
| int i; |
| for (i = 0; i < maxGenerations; i++, currentGeneration++) { |
| // Print status. |
| if (i % maxStableGenerations == 0) { |
| System.out.printf( |
| "Fittest individual of generation %,d (%,d): %,.4f\n", |
| i, |
| currentGeneration, |
| individuals.get(0).getFitness() |
| ); |
| } |
| |
| individuals = optimizer.evolve(individuals); |
| |
| if (updateFrequency > 0 && i > 0 && i % updateFrequency == 0) { |
| System.out.println("Intermediate update:"); |
| this.printResults(optimizer, individuals.get(0)); |
| } |
| |
| // Check if the time limit has passed. |
| if (stopMillis > 0 && stopMillis <= System.currentTimeMillis()) break; |
| |
| // Check whether we seem to be stuck in a (local) optimum. |
| if (i % maxStableGenerations == 0) { |
| final double currentFitness = individuals.get(0).getFitness(); |
| if (!(currentFitness >= checkpointedFitness + 0.001) && currentFitness >= minFitness && i > 0) { |
| break; |
| } else { |
| checkpointedFitness = currentFitness; |
| } |
| } |
| } |
| |
| System.out.printf( |
| "Final fittest individual of generation %,d (%,d): %,.4f\n", |
| i, |
| currentGeneration, |
| individuals.get(0).getFitness() |
| ); |
| |
| return new Tuple<>(currentGeneration, individuals); |
| } |
| |
| /** |
| * Group {@link PartialExecution}s by their comprised {@link LoadProfileEstimator}s template. |
| * |
| * @param partialExecutions the {@link PartialExecution}s |
| * @return the grouping of the {@link #partialExecutions} |
| */ |
| private Map<Set<String>, List<PartialExecution>> groupPartialExecutions(Collection<PartialExecution> partialExecutions) { |
| Map<Set<String>, List<PartialExecution>> groups = new HashMap<>(); |
| for (PartialExecution partialExecution : partialExecutions) { |
| |
| // Determine the ExecutionOperator classes in the partialExecution. |
| final Set<String> templateKeys = this.getLoadProfileEstimatorTemplateKeys(partialExecution); |
| |
| // Index the partialExecution. |
| groups.computeIfAbsent(templateKeys, key -> new LinkedList<>()).add(partialExecution); |
| } |
| |
| return groups; |
| } |
| |
| /** |
| * Extract the {@link LoadProfileEstimator} template keys in the given {@link PartialExecution}. |
| * |
| * @param partialExecution the {@link PartialExecution} |
| * @return the {@link ExecutionOperator} {@link Class}es |
| */ |
| private Set<String> getLoadProfileEstimatorTemplateKeys(PartialExecution partialExecution) { |
| return partialExecution.getAtomicExecutionGroups().stream() |
| .flatMap(group -> group.getAtomicExecutions().stream()) |
| .flatMap(execution -> execution.getLoadProfileEstimator().getTemplateKeys().stream()) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Bin given {@link PartialExecution}s by their execution time and retain one representative per bin. |
| * |
| * @param partialExecutions the {@link PartialExecution}s |
| * @param densityFactor the stretch of each bin |
| * @return the binned {@link PartialExecution}s |
| */ |
| private Collection<PartialExecution> binByExecutionTime(Collection<PartialExecution> partialExecutions, double densityFactor) { |
| Map<Integer, PartialExecution> resultBins = new HashMap<>(); |
| for (PartialExecution partialExecution : partialExecutions) { |
| int key = (int) Math.round(Math.log1p(partialExecution.getMeasuredExecutionTime()) / Math.log(densityFactor)); |
| resultBins.put(key, partialExecution); |
| } |
| return resultBins.values(); |
| } |
| |
| private static String format(PartialExecution partialExecution) { |
| return String.format("[%d atomic execution groups in %s: %s, %s]", |
| partialExecution.getAtomicExecutionGroups().size(), |
| Formats.formatDuration(partialExecution.getMeasuredExecutionTime()), |
| partialExecution.getAtomicExecutionGroups(), |
| partialExecution.getInitializedPlatforms() |
| ); |
| } |
| |
| public static void main(String[] args) { |
| Configuration configuration = args.length == 0 ? new Configuration() : new Configuration(args[0]); |
| if (args.length >= 2) { |
| configuration.setProperty("wayang.core.log.executions", args[1]); |
| } |
| new GeneticOptimizerApp(configuration).run(); |
| } |
| } |