blob: 97454b3ab875587423de15a5b4e606907a2926b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.api;
import org.apache.wayang.commons.util.profiledb.instrumentation.StopWatch;
import org.apache.wayang.commons.util.profiledb.model.Experiment;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.monitor.DisabledMonitor;
import org.apache.wayang.core.monitor.FileMonitor;
import org.apache.wayang.core.monitor.Monitor;
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimatorManager;
import org.apache.wayang.core.optimizer.costs.TimeEstimate;
import org.apache.wayang.core.optimizer.costs.TimeToCostConverter;
import org.apache.wayang.core.optimizer.enumeration.ExecutionTaskFlow;
import org.apache.wayang.core.optimizer.enumeration.PlanEnumeration;
import org.apache.wayang.core.optimizer.enumeration.PlanEnumerator;
import org.apache.wayang.core.optimizer.enumeration.PlanImplementation;
import org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionPlan;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.PlanMetrics;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.platform.AtomicExecutionGroup;
import org.apache.wayang.core.platform.Breakpoint;
import org.apache.wayang.core.platform.CardinalityBreakpoint;
import org.apache.wayang.core.platform.ConjunctiveBreakpoint;
import org.apache.wayang.core.platform.CrossPlatformExecutor;
import org.apache.wayang.core.platform.ExecutionState;
import org.apache.wayang.core.platform.FixBreakpoint;
import org.apache.wayang.core.platform.NoIterationBreakpoint;
import org.apache.wayang.core.platform.PartialExecution;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.profiling.CardinalityRepository;
import org.apache.wayang.core.profiling.CostMeasurement;
import org.apache.wayang.core.profiling.ExecutionLog;
import org.apache.wayang.core.profiling.ExecutionPlanMeasurement;
import org.apache.wayang.core.profiling.InstrumentationStrategy;
import org.apache.wayang.core.profiling.PartialExecutionMeasurement;
import org.apache.wayang.core.util.Formats;
import org.apache.wayang.core.util.OneTimeExecutable;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* Describes a job that is to be executed using Wayang.
*/
public class Job extends OneTimeExecutable {
private final Logger logger = LogManager.getLogger(this.getClass());
/**
* Guardian to avoid re-execution.
*/
private final AtomicBoolean hasBeenExecuted = new AtomicBoolean(false);
/**
* References the {@link WayangContext} that spawned this instance.
*/
private final WayangContext wayangContext;
/**
* {@link Job}-level {@link Configuration} based on the {@link WayangContext}-level configuration.
*/
private final Configuration configuration;
/**
* The {@link WayangPlan} to be executed by this instance.
*/
private final WayangPlan wayangPlan;
/**
* {@link OptimizationContext} for the {@link #wayangPlan}.
*/
private DefaultOptimizationContext optimizationContext;
/**
* General purpose cache.
*/
private Map<String, Object> cache = new HashMap<>();
/**
* Executes the optimized {@link ExecutionPlan}.
*/
private CrossPlatformExecutor crossPlatformExecutor;
/**
* Manages the {@link CardinalityEstimate}s for the {@link #wayangPlan}.
*/
private CardinalityEstimatorManager cardinalityEstimatorManager;
/**
* Collects metadata w.r.t. the processing of this instance.
*/
private final Experiment experiment;
/**
* {@link StopWatch} to measure some key figures for the {@link #experiment}.
*/
private final StopWatch stopWatch;
/**
* {@link TimeMeasurement}s for the optimization and the execution phases.
*/
private final TimeMeasurement optimizationRound, executionRound;
/**
* Collects the {@link TimeEstimate}s of all (partially) executed {@link PlanImplementation}s.
*/
private List<TimeEstimate> timeEstimates = new LinkedList<>();
/**
* Collects the cost estimates of all (partially) executed {@link PlanImplementation}s.
*/
private List<ProbabilisticDoubleInterval> costEstimates = new LinkedList<>();
/**
* JAR files that are needed to execute the UDFs.
*/
private final Set<String> udfJarPaths = new HashSet<>();
private Monitor monitor;
/**
* Name for this instance.
*/
private final String name;
/**
* <i>Currently not used.</i>
*/
private final StageAssignmentTraversal.StageSplittingCriterion stageSplittingCriterion =
(producerTask, channel, consumerTask) -> false;
/**
* The {@link PlanImplementation} that is being executed.
*/
private PlanImplementation planImplementation;
/**
* Controls at which {@link CardinalityEstimate}s the execution should be interrupted.
*/
private final CardinalityBreakpoint cardinalityBreakpoint;
private final boolean isProactiveReoptimization;
/**
* Creates a new instance.
*
* @param name name for this instance or {@code null} if a default name should be picked
* @param experiment an {@link Experiment} for that profiling entries will be created
* @param udfJars paths to JAR files needed to run the UDFs (see {@link ReflectionUtils#getDeclaringJar(Class)})
*/
Job(WayangContext wayangContext, String name, Monitor monitor, WayangPlan wayangPlan, Experiment experiment, String... udfJars) {
this.wayangContext = wayangContext;
this.name = name == null ? "Wayang app" : name;
this.configuration = this.wayangContext.getConfiguration().fork(this.name);
this.wayangPlan = wayangPlan;
for (String udfJar : udfJars) {
this.addUdfJar(udfJar);
}
// Prepare re-optimization.
if (this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize")) {
this.cardinalityBreakpoint = new CardinalityBreakpoint(this.configuration);
this.isProactiveReoptimization =
this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize.proactive", false);
} else {
this.cardinalityBreakpoint = null;
this.isProactiveReoptimization = false;
}
// Prepare instrumentation.
this.experiment = experiment;
this.stopWatch = new StopWatch(experiment);
this.optimizationRound = this.stopWatch.getOrCreateRound("Optimization");
this.executionRound = this.stopWatch.getOrCreateRound("Execution");
// Configure job monitor.
if (Monitor.isEnabled(this.configuration)) {
this.monitor = monitor == null ? new FileMonitor() : monitor;
} else {
this.monitor = new DisabledMonitor();
}
}
/**
* Adds a {@code path} to a JAR that is required in one or more UDFs.
*
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public void addUdfJar(String path) {
this.udfJarPaths.add(path);
}
/**
* Run this instance. Must only be called once.
*
* @throws WayangException in case the execution fails for any reason
*/
@Override
public void execute() throws WayangException {
try {
super.execute();
} catch (WayangException e) {
throw e;
} catch (Throwable t) {
throw new WayangException("Job execution failed.", t);
}
}
public ExecutionPlan buildInitialExecutionPlan() throws WayangException {
this.prepareWayangPlan();
this.estimateKeyFigures();
// Get initial execution plan.
ExecutionPlan executionPlan = this.createInitialExecutionPlan();
return executionPlan;
}
// TODO: Move outside of Job class
public void reportProgress(String opName, Integer progress) {
HashMap<String, Integer> partialProgress = new HashMap<>();
partialProgress.put(opName, progress);
try {
this.monitor.updateProgress(partialProgress);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void doExecute() {
// Make sure that each job is only executed once.
if (this.hasBeenExecuted.getAndSet(true)) {
throw new WayangException("Job has already been executed.");
}
try {
// Prepare the #wayangPlan for the optimization.
this.optimizationRound.start();
this.prepareWayangPlan();
// Estimate cardinalities and execution times for the #wayangPlan.
this.estimateKeyFigures();
// Get an execution plan.
int executionId = 0;
ExecutionPlan executionPlan = this.createInitialExecutionPlan();
this.optimizationRound.stop();
if (this.experiment != null) {
this.experiment.addMeasurement(ExecutionPlanMeasurement.capture(
executionPlan,
String.format("execution-plan-%d", executionId)
));
}
// TODO: generate run ID. For now we fix this because we can't handle multiple jobs, neither in montoring nor execution.
String runId = "1";
try {
monitor.initialize(this.configuration, runId, executionPlan.toJsonList());
} catch (Exception e) {
this.logger.warn("Failed to initialize monitor: {}", e);
}
// Take care of the execution.
while (!this.execute(executionPlan, executionId)) {
this.optimizationRound.start();
if (this.postProcess(executionPlan, executionId)) {
executionId++;
if (this.experiment != null) {
this.experiment.addMeasurement(ExecutionPlanMeasurement.capture(
executionPlan,
String.format("execution-plan-%d", executionId)
));
}
}
this.optimizationRound.stop();
}
this.stopWatch.start("Post-processing");
if (this.configuration.getBooleanProperty("wayang.core.log.enabled")) {
this.logExecution();
}
} catch (WayangException e) {
throw e;
} catch (Throwable t) {
throw new WayangException("Job execution failed.", t);
} finally {
this.stopWatch.stopAll();
this.stopWatch.start("Post-processing", "Release Resources");
this.releaseResources();
this.stopWatch.stop("Post-processing");
this.logger.info("StopWatch results:\n{}", this.stopWatch.toPrettyString());
}
}
/**
* Prepares the {@link #wayangPlan}: prunes unused {@link Operator}s, isolates loops, and applies all available
* {@link PlanTransformation}s.
*/
private void prepareWayangPlan() {
this.logger.info("Preparing plan...");
// Prepare the WayangPlan for the optimization.
this.optimizationRound.start("Prepare", "Prune&Isolate");
this.wayangPlan.prepare();
this.optimizationRound.stop("Prepare", "Prune&Isolate");
// Apply the mappings to the plan to form a hyperplan.
this.optimizationRound.start("Prepare", "Transformations");
final Collection<PlanTransformation> transformations = this.gatherTransformations();
this.wayangPlan.applyTransformations(transformations);
this.optimizationRound.stop("Prepare", "Transformations");
this.optimizationRound.start("Prepare", "Sanity");
assert this.wayangPlan.isSane();
this.optimizationRound.stop("Prepare", "Sanity");
this.optimizationRound.stop("Prepare");
}
/**
* Gather all available {@link PlanTransformation}s from the {@link #configuration}.
*/
private Collection<PlanTransformation> gatherTransformations() {
final Set<Platform> platforms = WayangCollections.asSet(this.configuration.getPlatformProvider().provideAll());
return this.configuration.getMappingProvider().provideAll().stream()
.flatMap(mapping -> mapping.getTransformations().stream())
.filter(t -> t.getTargetPlatforms().isEmpty() || platforms.containsAll(t.getTargetPlatforms()))
.collect(Collectors.toList());
}
/**
* Go over the given {@link WayangPlan} and estimate the cardinalities of data being passed between its
* {@link Operator}s and the execution profile and time of {@link ExecutionOperator}s.
*/
private void estimateKeyFigures() {
this.logger.info("Estimating cardinalities and execution load...");
this.optimizationRound.start("Cardinality&Load Estimation");
if (this.cardinalityEstimatorManager == null) {
this.optimizationRound.start("Cardinality&Load Estimation", "Create OptimizationContext");
this.optimizationContext = DefaultOptimizationContext.createFrom(this);
this.optimizationRound.stop("Cardinality&Load Estimation", "Create OptimizationContext");
this.optimizationRound.start("Cardinality&Load Estimation", "Create CardinalityEstimationManager");
this.cardinalityEstimatorManager = new CardinalityEstimatorManager(
this.wayangPlan, this.optimizationContext, this.configuration);
this.optimizationRound.stop("Cardinality&Load Estimation", "Create CardinalityEstimationManager");
}
this.optimizationRound.start("Cardinality&Load Estimation", "Push Estimation");
this.cardinalityEstimatorManager.pushCardinalities();
this.optimizationRound.stop("Cardinality&Load Estimation", "Push Estimation");
this.optimizationRound.stop("Cardinality&Load Estimation");
}
/**
* Determine a good/the best execution plan from a given {@link WayangPlan}.
*/
private ExecutionPlan createInitialExecutionPlan() {
this.logger.info("Enumerating execution plans...");
this.optimizationRound.start("Create Initial Execution Plan");
// Enumerate all possible plan.
final PlanEnumerator planEnumerator = this.createPlanEnumerator();
final TimeMeasurement enumerateMeasurment = this.optimizationRound.start("Create Initial Execution Plan", "Enumerate");
planEnumerator.setTimeMeasurement(enumerateMeasurment);
final PlanEnumeration comprehensiveEnumeration = planEnumerator.enumerate(true);
planEnumerator.setTimeMeasurement(null);
this.optimizationRound.stop("Create Initial Execution Plan", "Enumerate");
final Collection<PlanImplementation> executionPlans = comprehensiveEnumeration.getPlanImplementations();
this.logger.debug("Enumerated {} plans.", executionPlans.size());
for (PlanImplementation planImplementation : executionPlans) {
this.logger.debug("Plan with operators: {}", planImplementation.getOperators());
}
// Pick an execution plan.
// Make sure that an execution plan can be created.
this.optimizationRound.start("Create Initial Execution Plan", "Pick Best Plan");
this.pickBestExecutionPlan(executionPlans, null, null, null);
this.timeEstimates.add(planImplementation.getTimeEstimate());
this.costEstimates.add(planImplementation.getCostEstimate());
this.optimizationRound.stop("Create Initial Execution Plan", "Pick Best Plan");
this.logger.info("Compiling execution plan...");
this.optimizationRound.start("Create Initial Execution Plan", "Split Stages");
final ExecutionTaskFlow executionTaskFlow = ExecutionTaskFlow.createFrom(this.planImplementation);
final ExecutionPlan executionPlan = ExecutionPlan.createFrom(executionTaskFlow, this.stageSplittingCriterion);
this.optimizationRound.stop("Create Initial Execution Plan", "Split Stages");
this.planImplementation.mergeJunctionOptimizationContexts();
this.planImplementation.logTimeEstimates();
//assert executionPlan.isSane();
this.optimizationRound.stop("Create Initial Execution Plan");
return executionPlan;
}
private PlanImplementation pickBestExecutionPlan(Collection<PlanImplementation> executionPlans,
ExecutionPlan existingPlan,
Set<Channel> openChannels,
Set<ExecutionStage> executedStages) {
final PlanImplementation bestPlanImplementation = executionPlans.stream()
.reduce((p1, p2) -> {
final double t1 = p1.getSquashedCostEstimate();
final double t2 = p2.getSquashedCostEstimate();
return t1 < t2 ? p1 : p2;
})
.orElseThrow(() -> new WayangException("Could not find an execution plan."));
this.logger.info("Picked {} as best plan.", bestPlanImplementation);
return this.planImplementation = bestPlanImplementation;
}
/**
* Go over the given {@link WayangPlan} and update the cardinalities of data being passed between its
* {@link Operator}s using the given {@link ExecutionState}.
*
* @return whether any cardinalities have been updated
*/
private boolean reestimateCardinalities(ExecutionState executionState) {
return this.cardinalityEstimatorManager.pushCardinalityUpdates(executionState, this.planImplementation);
}
/**
* Creates a new {@link PlanEnumerator} for the {@link #wayangPlan} and {@link #configuration}.
*/
private PlanEnumerator createPlanEnumerator() {
return this.createPlanEnumerator(null, null);
}
/**
* Creates a new {@link PlanEnumerator} for the {@link #wayangPlan} and {@link #configuration}.
*/
private PlanEnumerator createPlanEnumerator(ExecutionPlan existingPlan, Set<Channel> openChannels) {
return existingPlan == null ?
new PlanEnumerator(this.wayangPlan, this.optimizationContext) :
new PlanEnumerator(this.wayangPlan, this.optimizationContext, existingPlan, openChannels);
}
/**
* Start executing the given {@link ExecutionPlan} with all bells and whistles, such as instrumentation,
* logging of the plan, and measuring the execution time.
*
* @param executionPlan that should be executed
* @param executionId an identifier for the current execution
* @return whether the execution of the {@link ExecutionPlan} is completed
*/
private boolean execute(ExecutionPlan executionPlan, int executionId) {
final TimeMeasurement currentExecutionRound = this.executionRound.start(String.format("Execution %d", executionId));
// Ensure existence of the #crossPlatformExecutor.
if (this.crossPlatformExecutor == null) {
final InstrumentationStrategy instrumentation = this.configuration.getInstrumentationStrategyProvider().provide();
this.crossPlatformExecutor = new CrossPlatformExecutor(this, instrumentation);
}
if (this.configuration.getOptionalBooleanProperty("wayang.core.debug.skipexecution").orElse(false)) {
return true;
}
if (this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize")) {
this.setUpBreakpoint(executionPlan, currentExecutionRound);
}
// Log the current executionPlan.
this.logStages(executionPlan);
// Trigger the execution.
currentExecutionRound.start("Execute");
boolean isExecutionComplete = this.crossPlatformExecutor.executeUntilBreakpoint(
executionPlan, this.optimizationContext
);
executionRound.stop();
// Return.
return isExecutionComplete;
}
/**
* Sets up a {@link Breakpoint} for an {@link ExecutionPlan}.
*
* @param executionPlan for that the {@link Breakpoint} should be set
* @param round {@link TimeMeasurement} to be extended for any interesting time measurements
*/
private void setUpBreakpoint(ExecutionPlan executionPlan, TimeMeasurement round) {
// Set up appropriate Breakpoints.
final TimeMeasurement breakpointRound = round.start("Configure Breakpoint");
FixBreakpoint immediateBreakpoint = new FixBreakpoint();
final Set<ExecutionStage> completedStages = this.crossPlatformExecutor.getCompletedStages();
if (completedStages.isEmpty()) {
executionPlan.getStartingStages().forEach(immediateBreakpoint::breakAfter);
} else {
completedStages.stream()
.flatMap(stage -> stage.getSuccessors().stream())
.filter(stage -> !completedStages.contains(stage))
.forEach(immediateBreakpoint::breakAfter);
}
this.crossPlatformExecutor.setBreakpoint(new ConjunctiveBreakpoint(
immediateBreakpoint,
this.cardinalityBreakpoint,
new NoIterationBreakpoint() // Avoid re-optimization inside of loops.
));
breakpointRound.stop();
}
private void logStages(ExecutionPlan executionPlan) {
if (this.logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
Set<ExecutionStage> seenStages = new HashSet<>();
Queue<ExecutionStage> stagedStages = new LinkedList<>(executionPlan.getStartingStages());
ExecutionStage nextStage;
while ((nextStage = stagedStages.poll()) != null) {
sb.append(nextStage).append(":\n");
nextStage.getPlanAsString(sb, "* ");
nextStage.getSuccessors().stream()
.filter(seenStages::add)
.forEach(stagedStages::add);
}
this.logger.info("Current execution plan:\n{}", executionPlan.toExtensiveString());
}
}
/**
* Injects the cardinalities obtained from {@link Channel} instrumentation, potentially updates the {@link ExecutionPlan}
* through re-optimization, and collects measured data.
*
* @return whether the {@link ExecutionPlan} has been re-optimized
*/
private boolean postProcess(ExecutionPlan executionPlan, int executionId) {
if (this.crossPlatformExecutor.isVetoingPlanChanges()) {
this.logger.info("The cross-platform executor is currently not allowing re-optimization.");
return false;
}
final TimeMeasurement round = this.optimizationRound.start(String.format("Post-processing %d", executionId));
round.start("Reestimate Cardinalities&Time");
boolean isCardinalitiesUpdated = this.reestimateCardinalities(this.crossPlatformExecutor);
round.stop("Reestimate Cardinalities&Time");
round.start("Update Execution Plan");
if (isCardinalitiesUpdated) {
this.logger.info("Re-optimizing execution plan.");
this.updateExecutionPlan(executionPlan);
} else {
this.logger.info("Skipping re-optimization: no new insights on cardinalities.");
this.timeEstimates.add(this.timeEstimates.get(this.timeEstimates.size() - 1));
this.costEstimates.add(this.costEstimates.get(this.costEstimates.size() - 1));
}
round.stop("Update Execution Plan");
round.stop();
return true;
}
/**
* Enumerate possible execution plans from the given {@link WayangPlan} and determine the (seemingly) best one.
*/
private void updateExecutionPlan(ExecutionPlan executionPlan) {
// Defines the plan that we want to use in the end.
// Find and copy the open Channels.
final Set<ExecutionStage> completedStages = this.crossPlatformExecutor.getCompletedStages();
final Set<ExecutionTask> completedTasks = completedStages.stream()
.flatMap(stage -> stage.getAllTasks().stream())
.collect(Collectors.toSet());
// Find Channels that have yet to be consumed by unexecuted ExecutionTasks and scrap unexecuted bits of the plan.
final Set<Channel> openChannels = executionPlan.retain(completedStages);
// Enumerate all possible plan.
final PlanEnumerator planEnumerator = this.createPlanEnumerator(executionPlan, openChannels);
final PlanEnumeration comprehensiveEnumeration = planEnumerator.enumerate(true);
final Collection<PlanImplementation> executionPlans = comprehensiveEnumeration.getPlanImplementations();
this.logger.debug("Enumerated {} plans.", executionPlans.size());
for (PlanImplementation planImplementation : executionPlans) {
this.logger.debug("Plan with operators: {}", planImplementation.getOperators());
}
// Pick an execution plan.
// Make sure that an execution plan can be created.
this.pickBestExecutionPlan(executionPlans, executionPlan, openChannels, completedStages);
this.timeEstimates.add(this.planImplementation.getTimeEstimate());
this.costEstimates.add(this.planImplementation.getCostEstimate());
ExecutionTaskFlow executionTaskFlow = ExecutionTaskFlow.recreateFrom(
planImplementation, executionPlan, openChannels, completedStages
);
final ExecutionPlan executionPlanExpansion = ExecutionPlan.createFrom(executionTaskFlow, this.stageSplittingCriterion);
executionPlan.expand(executionPlanExpansion);
this.planImplementation.mergeJunctionOptimizationContexts();
assert executionPlan.isSane();
}
/**
* Asks this instance to release its critical resources to avoid resource leaks and to enhance durability and
* consistency of accessed resources.
*/
private void releaseResources() {
this.wayangContext.getCardinalityRepository().sleep();
if (this.crossPlatformExecutor != null) this.crossPlatformExecutor.shutdown();
}
private void logExecution() {
this.stopWatch.start("Post-processing", "Log measurements");
// For the last time, update the cardinalities and store them.
this.reestimateCardinalities(this.crossPlatformExecutor);
final CardinalityRepository cardinalityRepository = this.wayangContext.getCardinalityRepository();
cardinalityRepository.storeAll(this.crossPlatformExecutor, this.optimizationContext);
// Execution times.
final Collection<PartialExecution> partialExecutions = this.crossPlatformExecutor.getPartialExecutions();
// Add the execution times to the experiment.
int nextPartialExecutionMeasurementId = 0;
for (PartialExecution partialExecution : partialExecutions) {
if (this.logger.isDebugEnabled()) {
for (AtomicExecutionGroup atomicExecutionGroup : partialExecution.getAtomicExecutionGroups()) {
if (!(atomicExecutionGroup.getEstimationContext() instanceof OptimizationContext.OperatorContext)) {
continue;
}
OptimizationContext.OperatorContext operatorContext =
(OptimizationContext.OperatorContext) atomicExecutionGroup.getEstimationContext();
for (CardinalityEstimate cardinality : operatorContext.getInputCardinalities()) {
if (cardinality != null && !cardinality.isExact()) {
this.logger.debug(
"Inexact input cardinality estimate {} for {}.",
cardinality, operatorContext.getOperator()
);
}
}
for (CardinalityEstimate cardinality : operatorContext.getOutputCardinalities()) {
if (cardinality != null && !cardinality.isExact()) {
this.logger.debug(
"Inexact output cardinality estimate {} for {}.",
cardinality, operatorContext.getOperator()
);
}
}
}
}
String id = String.format("par-ex-%03d", nextPartialExecutionMeasurementId++);
final PartialExecutionMeasurement measurement = new PartialExecutionMeasurement(id, partialExecution, this.configuration);
this.experiment.addMeasurement(measurement);
}
// Feed the execution log.
try (ExecutionLog executionLog = ExecutionLog.open(this.configuration)) {
executionLog.storeAll(partialExecutions);
} catch (Exception e) {
this.logger.error("Storing partial executions failed.", e);
}
this.optimizationRound.stop("Post-processing", "Log measurements");
// Log the execution time.
long effectiveExecutionMillis = partialExecutions.stream()
.map(PartialExecution::getMeasuredExecutionTime)
.reduce(0L, (a, b) -> a + b);
long measuredExecutionMillis = this.executionRound.getMillis();
this.logger.info(
"Accumulated execution time: {} (effective: {}, overhead: {})",
Formats.formatDuration(measuredExecutionMillis, true),
Formats.formatDuration(effectiveExecutionMillis, true),
Formats.formatDuration(measuredExecutionMillis - effectiveExecutionMillis, true)
);
int i = 1;
for (TimeEstimate timeEstimate : this.timeEstimates) {
this.logger.info("Estimated execution time (plan {}): {}", i, timeEstimate);
TimeMeasurement lowerEstimate = new TimeMeasurement(String.format("Estimate %d (lower)", i));
lowerEstimate.setMillis(timeEstimate.getLowerEstimate());
this.stopWatch.getExperiment().addMeasurement(lowerEstimate);
TimeMeasurement upperEstimate = new TimeMeasurement(String.format("Estimate %d (upper)", i));
upperEstimate.setMillis(timeEstimate.getUpperEstimate());
this.stopWatch.getExperiment().addMeasurement(upperEstimate);
i++;
}
// Log the cost settings.
final Collection<Platform> consideredPlatforms = this.configuration.getPlatformProvider().provideAll();
for (Platform consideredPlatform : consideredPlatforms) {
final TimeToCostConverter timeToCostConverter = this.configuration
.getTimeToCostConverterProvider()
.provideFor(consideredPlatform);
this.experiment.getSubject().addConfiguration(
String.format("Costs per ms (%s)", consideredPlatform.getName()),
timeToCostConverter.getCostsPerMillisecond()
);
this.experiment.getSubject().addConfiguration(
String.format("Fix costs (%s)", consideredPlatform.getName()),
timeToCostConverter.getFixCosts()
);
}
// Log the execution costs.
double fixCosts = partialExecutions.stream()
.flatMap(partialExecution -> partialExecution.getInvolvedPlatforms().stream())
.map(platform -> this.configuration.getTimeToCostConverterProvider().provideFor(platform).getFixCosts())
.reduce(0d, (a, b) -> a + b);
double effectiveLowerCosts = fixCosts + partialExecutions.stream()
.map(PartialExecution::getMeasuredLowerCost)
.reduce(0d, (a, b) -> a + b);
double effectiveUpperCosts = fixCosts + partialExecutions.stream()
.map(PartialExecution::getMeasuredUpperCost)
.reduce(0d, (a, b) -> a + b);
this.logger.info("Accumulated costs: {} .. {}",
String.format("%,.2f", effectiveLowerCosts),
String.format("%,.2f", effectiveUpperCosts)
);
this.experiment.addMeasurement(
new CostMeasurement("Measured cost", effectiveLowerCosts, effectiveUpperCosts, 1d)
);
i = 1;
for (ProbabilisticDoubleInterval costEstimate : this.costEstimates) {
this.logger.info("Estimated costs (plan {}): {}", i, costEstimate);
this.experiment.addMeasurement(new CostMeasurement(
String.format("Estimated costs (%d)", i),
costEstimate.getLowerEstimate(),
costEstimate.getUpperEstimate(),
costEstimate.getCorrectnessProbability()
));
i++;
}
// Log some plan metrics.
final PlanMetrics planMetrics = PlanMetrics.createFor(this.wayangPlan, "Plan Metrics");
this.logger.info("Plan metrics: {} virtual operators, {} execution operators, {} alternatives, {} combinations",
planMetrics.getNumVirtualOperators(),
planMetrics.getNumExecutionOperators(),
planMetrics.getNumAlternatives(),
planMetrics.getNumCombinations()
);
this.experiment.addMeasurement(planMetrics);
}
/**
* Whether a {@link Breakpoint} is requested for the given {@link OutputSlot}.
*
* @param output the {@link OutputSlot}
* @param operatorContext the {@link OptimizationContext.OperatorContext} for the {@link OutputSlot} owner
* @return whether a {@link Breakpoint} is requested
*/
public boolean isRequestBreakpointFor(OutputSlot<?> output, OptimizationContext.OperatorContext operatorContext) {
return this.isProactiveReoptimization
&& output.getOwner().getInnermostLoop() == null
&& this.cardinalityBreakpoint != null
&& !this.cardinalityBreakpoint.approves(operatorContext.getOutputCardinality(output.getIndex())
);
}
/**
* Modify the {@link Configuration} to control the {@link Job} execution.
*/
public Configuration getConfiguration() {
return this.configuration;
}
public Set<String> getUdfJarPaths() {
return this.udfJarPaths;
}
/**
* Provide the {@link CrossPlatformExecutor} used during the execution of this instance.
*
* @return the {@link CrossPlatformExecutor} or {@code null} if there is none allocated
*/
public CrossPlatformExecutor getCrossPlatformExecutor() {
return this.crossPlatformExecutor;
}
public DefaultOptimizationContext getOptimizationContext() {
return this.optimizationContext;
}
/**
* Retrieves the name of this instance.
*
* @return the name
*/
public String getName() {
return this.name;
}
@Override
public String toString() {
return String.format("%s[%s]", this.getClass().getSimpleName(), this.name);
}
/**
* Provide the {@link Experiment} being recorded with the execution of this instance.
*
* @return the {@link Experiment}
*/
public Experiment getExperiment() {
return this.experiment;
}
/**
* Provide the {@link WayangPlan} executed by this instance.
*
* @return the {@link WayangPlan}
*/
public WayangPlan getWayangPlan() {
return this.wayangPlan;
}
/**
* Provide the {@link StopWatch} that is used to instrument the execution of this instance.
*
* @return the {@link StopWatch}
*/
public StopWatch getStopWatch() {
return this.stopWatch;
}
/**
* Provides a general-purpose cache. Can be used to communicate job-global information.
*
* @return the cache
*/
public Map<String, Object> getCache() {
return this.cache;
}
}