| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.api; |
| |
| import org.apache.wayang.commons.util.profiledb.instrumentation.StopWatch; |
| import org.apache.wayang.commons.util.profiledb.model.Experiment; |
| import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement; |
| import org.apache.wayang.core.api.exception.WayangException; |
| import org.apache.wayang.core.mapping.PlanTransformation; |
| import org.apache.wayang.core.monitor.DisabledMonitor; |
| import org.apache.wayang.core.monitor.FileMonitor; |
| import org.apache.wayang.core.monitor.Monitor; |
| import org.apache.wayang.core.optimizer.DefaultOptimizationContext; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimatorManager; |
| import org.apache.wayang.core.optimizer.costs.TimeEstimate; |
| import org.apache.wayang.core.optimizer.costs.TimeToCostConverter; |
| import org.apache.wayang.core.optimizer.enumeration.ExecutionTaskFlow; |
| import org.apache.wayang.core.optimizer.enumeration.PlanEnumeration; |
| import org.apache.wayang.core.optimizer.enumeration.PlanEnumerator; |
| import org.apache.wayang.core.optimizer.enumeration.PlanImplementation; |
| import org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal; |
| import org.apache.wayang.core.plan.executionplan.Channel; |
| import org.apache.wayang.core.plan.executionplan.ExecutionPlan; |
| import org.apache.wayang.core.plan.executionplan.ExecutionStage; |
| import org.apache.wayang.core.plan.executionplan.ExecutionTask; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.plan.wayangplan.Operator; |
| import org.apache.wayang.core.plan.wayangplan.OutputSlot; |
| import org.apache.wayang.core.plan.wayangplan.PlanMetrics; |
| import org.apache.wayang.core.plan.wayangplan.WayangPlan; |
| import org.apache.wayang.core.platform.AtomicExecutionGroup; |
| import org.apache.wayang.core.platform.Breakpoint; |
| import org.apache.wayang.core.platform.CardinalityBreakpoint; |
| import org.apache.wayang.core.platform.ConjunctiveBreakpoint; |
| import org.apache.wayang.core.platform.CrossPlatformExecutor; |
| import org.apache.wayang.core.platform.ExecutionState; |
| import org.apache.wayang.core.platform.FixBreakpoint; |
| import org.apache.wayang.core.platform.NoIterationBreakpoint; |
| import org.apache.wayang.core.platform.PartialExecution; |
| import org.apache.wayang.core.platform.Platform; |
| import org.apache.wayang.core.profiling.CardinalityRepository; |
| import org.apache.wayang.core.profiling.CostMeasurement; |
| import org.apache.wayang.core.profiling.ExecutionLog; |
| import org.apache.wayang.core.profiling.ExecutionPlanMeasurement; |
| import org.apache.wayang.core.profiling.InstrumentationStrategy; |
| import org.apache.wayang.core.profiling.PartialExecutionMeasurement; |
| import org.apache.wayang.core.util.Formats; |
| import org.apache.wayang.core.util.OneTimeExecutable; |
| import org.apache.wayang.core.util.ReflectionUtils; |
| import org.apache.wayang.core.util.WayangCollections; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Describes a job that is to be executed using Wayang. |
| */ |
| public class Job extends OneTimeExecutable { |
| |
| private final Logger logger = LogManager.getLogger(this.getClass()); |
| |
| /** |
| * Guardian to avoid re-execution. |
| */ |
| private final AtomicBoolean hasBeenExecuted = new AtomicBoolean(false); |
| |
| /** |
| * References the {@link WayangContext} that spawned this instance. |
| */ |
| private final WayangContext wayangContext; |
| |
| /** |
| * {@link Job}-level {@link Configuration} based on the {@link WayangContext}-level configuration. |
| */ |
| private final Configuration configuration; |
| |
| /** |
| * The {@link WayangPlan} to be executed by this instance. |
| */ |
| private final WayangPlan wayangPlan; |
| |
| /** |
| * {@link OptimizationContext} for the {@link #wayangPlan}. |
| */ |
| private DefaultOptimizationContext optimizationContext; |
| |
| /** |
| * General purpose cache. |
| */ |
| private Map<String, Object> cache = new HashMap<>(); |
| |
| /** |
| * Executes the optimized {@link ExecutionPlan}. |
| */ |
| private CrossPlatformExecutor crossPlatformExecutor; |
| |
| /** |
| * Manages the {@link CardinalityEstimate}s for the {@link #wayangPlan}. |
| */ |
| private CardinalityEstimatorManager cardinalityEstimatorManager; |
| |
| /** |
| * Collects metadata w.r.t. the processing of this instance. |
| */ |
| private final Experiment experiment; |
| |
| /** |
| * {@link StopWatch} to measure some key figures for the {@link #experiment}. |
| */ |
| private final StopWatch stopWatch; |
| |
| /** |
| * {@link TimeMeasurement}s for the optimization and the execution phases. |
| */ |
| private final TimeMeasurement optimizationRound, executionRound; |
| |
| /** |
| * Collects the {@link TimeEstimate}s of all (partially) executed {@link PlanImplementation}s. |
| */ |
| private List<TimeEstimate> timeEstimates = new LinkedList<>(); |
| |
| /** |
| * Collects the cost estimates of all (partially) executed {@link PlanImplementation}s. |
| */ |
| private List<ProbabilisticDoubleInterval> costEstimates = new LinkedList<>(); |
| |
| /** |
| * JAR files that are needed to execute the UDFs. |
| */ |
| private final Set<String> udfJarPaths = new HashSet<>(); |
| |
| private Monitor monitor; |
| |
| /** |
| * Name for this instance. |
| */ |
| private final String name; |
| |
| /** |
| * <i>Currently not used.</i> |
| */ |
| private final StageAssignmentTraversal.StageSplittingCriterion stageSplittingCriterion = |
| (producerTask, channel, consumerTask) -> false; |
| |
| /** |
| * The {@link PlanImplementation} that is being executed. |
| */ |
| private PlanImplementation planImplementation; |
| |
| /** |
| * Controls at which {@link CardinalityEstimate}s the execution should be interrupted. |
| */ |
| private final CardinalityBreakpoint cardinalityBreakpoint; |
| |
| private final boolean isProactiveReoptimization; |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param name name for this instance or {@code null} if a default name should be picked |
| * @param experiment an {@link Experiment} for that profiling entries will be created |
| * @param udfJars paths to JAR files needed to run the UDFs (see {@link ReflectionUtils#getDeclaringJar(Class)}) |
| */ |
| Job(WayangContext wayangContext, String name, Monitor monitor, WayangPlan wayangPlan, Experiment experiment, String... udfJars) { |
| this.wayangContext = wayangContext; |
| this.name = name == null ? "Wayang app" : name; |
| this.configuration = this.wayangContext.getConfiguration().fork(this.name); |
| this.wayangPlan = wayangPlan; |
| for (String udfJar : udfJars) { |
| this.addUdfJar(udfJar); |
| } |
| |
| // Prepare re-optimization. |
| if (this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize")) { |
| this.cardinalityBreakpoint = new CardinalityBreakpoint(this.configuration); |
| this.isProactiveReoptimization = |
| this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize.proactive", false); |
| } else { |
| this.cardinalityBreakpoint = null; |
| this.isProactiveReoptimization = false; |
| } |
| |
| // Prepare instrumentation. |
| this.experiment = experiment; |
| this.stopWatch = new StopWatch(experiment); |
| this.optimizationRound = this.stopWatch.getOrCreateRound("Optimization"); |
| this.executionRound = this.stopWatch.getOrCreateRound("Execution"); |
| |
| // Configure job monitor. |
| if (Monitor.isEnabled(this.configuration)) { |
| this.monitor = monitor == null ? new FileMonitor() : monitor; |
| } else { |
| this.monitor = new DisabledMonitor(); |
| } |
| } |
| |
| /** |
| * Adds a {@code path} to a JAR that is required in one or more UDFs. |
| * |
| * @see ReflectionUtils#getDeclaringJar(Class) |
| */ |
| public void addUdfJar(String path) { |
| this.udfJarPaths.add(path); |
| } |
| |
| /** |
| * Run this instance. Must only be called once. |
| * |
| * @throws WayangException in case the execution fails for any reason |
| */ |
| @Override |
| public void execute() throws WayangException { |
| try { |
| super.execute(); |
| } catch (WayangException e) { |
| throw e; |
| } catch (Throwable t) { |
| throw new WayangException("Job execution failed.", t); |
| } |
| } |
| |
| public ExecutionPlan buildInitialExecutionPlan() throws WayangException { |
| this.prepareWayangPlan(); |
| this.estimateKeyFigures(); |
| |
| // Get initial execution plan. |
| ExecutionPlan executionPlan = this.createInitialExecutionPlan(); |
| return executionPlan; |
| } |
| |
| // TODO: Move outside of Job class |
| public void reportProgress(String opName, Integer progress) { |
| HashMap<String, Integer> partialProgress = new HashMap<>(); |
| partialProgress.put(opName, progress); |
| try { |
| this.monitor.updateProgress(partialProgress); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Override |
| protected void doExecute() { |
| // Make sure that each job is only executed once. |
| if (this.hasBeenExecuted.getAndSet(true)) { |
| throw new WayangException("Job has already been executed."); |
| } |
| |
| try { |
| |
| // Prepare the #wayangPlan for the optimization. |
| this.optimizationRound.start(); |
| this.prepareWayangPlan(); |
| |
| // Estimate cardinalities and execution times for the #wayangPlan. |
| this.estimateKeyFigures(); |
| |
| // Get an execution plan. |
| int executionId = 0; |
| ExecutionPlan executionPlan = this.createInitialExecutionPlan(); |
| this.optimizationRound.stop(); |
| if (this.experiment != null) { |
| this.experiment.addMeasurement(ExecutionPlanMeasurement.capture( |
| executionPlan, |
| String.format("execution-plan-%d", executionId) |
| )); |
| } |
| |
| // TODO: generate run ID. For now we fix this because we can't handle multiple jobs, neither in montoring nor execution. |
| String runId = "1"; |
| try { |
| monitor.initialize(this.configuration, runId, executionPlan.toJsonList()); |
| } catch (Exception e) { |
| this.logger.warn("Failed to initialize monitor: {}", e); |
| } |
| |
| |
| // Take care of the execution. |
| while (!this.execute(executionPlan, executionId)) { |
| this.optimizationRound.start(); |
| if (this.postProcess(executionPlan, executionId)) { |
| executionId++; |
| if (this.experiment != null) { |
| this.experiment.addMeasurement(ExecutionPlanMeasurement.capture( |
| executionPlan, |
| String.format("execution-plan-%d", executionId) |
| )); |
| } |
| } |
| this.optimizationRound.stop(); |
| } |
| |
| this.stopWatch.start("Post-processing"); |
| if (this.configuration.getBooleanProperty("wayang.core.log.enabled")) { |
| this.logExecution(); |
| } |
| } catch (WayangException e) { |
| throw e; |
| } catch (Throwable t) { |
| throw new WayangException("Job execution failed.", t); |
| } finally { |
| this.stopWatch.stopAll(); |
| this.stopWatch.start("Post-processing", "Release Resources"); |
| this.releaseResources(); |
| this.stopWatch.stop("Post-processing"); |
| this.logger.info("StopWatch results:\n{}", this.stopWatch.toPrettyString()); |
| } |
| } |
| |
| /** |
| * Prepares the {@link #wayangPlan}: prunes unused {@link Operator}s, isolates loops, and applies all available |
| * {@link PlanTransformation}s. |
| */ |
| private void prepareWayangPlan() { |
| this.logger.info("Preparing plan..."); |
| |
| // Prepare the WayangPlan for the optimization. |
| this.optimizationRound.start("Prepare", "Prune&Isolate"); |
| this.wayangPlan.prepare(); |
| this.optimizationRound.stop("Prepare", "Prune&Isolate"); |
| |
| // Apply the mappings to the plan to form a hyperplan. |
| this.optimizationRound.start("Prepare", "Transformations"); |
| final Collection<PlanTransformation> transformations = this.gatherTransformations(); |
| this.wayangPlan.applyTransformations(transformations); |
| this.optimizationRound.stop("Prepare", "Transformations"); |
| |
| this.optimizationRound.start("Prepare", "Sanity"); |
| assert this.wayangPlan.isSane(); |
| this.optimizationRound.stop("Prepare", "Sanity"); |
| |
| this.optimizationRound.stop("Prepare"); |
| } |
| |
| /** |
| * Gather all available {@link PlanTransformation}s from the {@link #configuration}. |
| */ |
| private Collection<PlanTransformation> gatherTransformations() { |
| final Set<Platform> platforms = WayangCollections.asSet(this.configuration.getPlatformProvider().provideAll()); |
| return this.configuration.getMappingProvider().provideAll().stream() |
| .flatMap(mapping -> mapping.getTransformations().stream()) |
| .filter(t -> t.getTargetPlatforms().isEmpty() || platforms.containsAll(t.getTargetPlatforms())) |
| .collect(Collectors.toList()); |
| } |
| |
| |
| /** |
| * Go over the given {@link WayangPlan} and estimate the cardinalities of data being passed between its |
| * {@link Operator}s and the execution profile and time of {@link ExecutionOperator}s. |
| */ |
| private void estimateKeyFigures() { |
| this.logger.info("Estimating cardinalities and execution load..."); |
| |
| this.optimizationRound.start("Cardinality&Load Estimation"); |
| if (this.cardinalityEstimatorManager == null) { |
| this.optimizationRound.start("Cardinality&Load Estimation", "Create OptimizationContext"); |
| this.optimizationContext = DefaultOptimizationContext.createFrom(this); |
| this.optimizationRound.stop("Cardinality&Load Estimation", "Create OptimizationContext"); |
| |
| this.optimizationRound.start("Cardinality&Load Estimation", "Create CardinalityEstimationManager"); |
| this.cardinalityEstimatorManager = new CardinalityEstimatorManager( |
| this.wayangPlan, this.optimizationContext, this.configuration); |
| this.optimizationRound.stop("Cardinality&Load Estimation", "Create CardinalityEstimationManager"); |
| } |
| |
| this.optimizationRound.start("Cardinality&Load Estimation", "Push Estimation"); |
| this.cardinalityEstimatorManager.pushCardinalities(); |
| this.optimizationRound.stop("Cardinality&Load Estimation", "Push Estimation"); |
| |
| this.optimizationRound.stop("Cardinality&Load Estimation"); |
| } |
| |
| |
| /** |
| * Determine a good/the best execution plan from a given {@link WayangPlan}. |
| */ |
| private ExecutionPlan createInitialExecutionPlan() { |
| this.logger.info("Enumerating execution plans..."); |
| |
| this.optimizationRound.start("Create Initial Execution Plan"); |
| |
| // Enumerate all possible plan. |
| final PlanEnumerator planEnumerator = this.createPlanEnumerator(); |
| |
| final TimeMeasurement enumerateMeasurment = this.optimizationRound.start("Create Initial Execution Plan", "Enumerate"); |
| planEnumerator.setTimeMeasurement(enumerateMeasurment); |
| final PlanEnumeration comprehensiveEnumeration = planEnumerator.enumerate(true); |
| planEnumerator.setTimeMeasurement(null); |
| this.optimizationRound.stop("Create Initial Execution Plan", "Enumerate"); |
| |
| final Collection<PlanImplementation> executionPlans = comprehensiveEnumeration.getPlanImplementations(); |
| this.logger.debug("Enumerated {} plans.", executionPlans.size()); |
| for (PlanImplementation planImplementation : executionPlans) { |
| this.logger.debug("Plan with operators: {}", planImplementation.getOperators()); |
| } |
| |
| // Pick an execution plan. |
| // Make sure that an execution plan can be created. |
| this.optimizationRound.start("Create Initial Execution Plan", "Pick Best Plan"); |
| this.pickBestExecutionPlan(executionPlans, null, null, null); |
| this.timeEstimates.add(planImplementation.getTimeEstimate()); |
| this.costEstimates.add(planImplementation.getCostEstimate()); |
| this.optimizationRound.stop("Create Initial Execution Plan", "Pick Best Plan"); |
| |
| this.logger.info("Compiling execution plan..."); |
| this.optimizationRound.start("Create Initial Execution Plan", "Split Stages"); |
| final ExecutionTaskFlow executionTaskFlow = ExecutionTaskFlow.createFrom(this.planImplementation); |
| final ExecutionPlan executionPlan = ExecutionPlan.createFrom(executionTaskFlow, this.stageSplittingCriterion); |
| this.optimizationRound.stop("Create Initial Execution Plan", "Split Stages"); |
| |
| this.planImplementation.mergeJunctionOptimizationContexts(); |
| |
| this.planImplementation.logTimeEstimates(); |
| |
| //assert executionPlan.isSane(); |
| |
| |
| this.optimizationRound.stop("Create Initial Execution Plan"); |
| return executionPlan; |
| } |
| |
| |
| private PlanImplementation pickBestExecutionPlan(Collection<PlanImplementation> executionPlans, |
| ExecutionPlan existingPlan, |
| Set<Channel> openChannels, |
| Set<ExecutionStage> executedStages) { |
| |
| final PlanImplementation bestPlanImplementation = executionPlans.stream() |
| .reduce((p1, p2) -> { |
| final double t1 = p1.getSquashedCostEstimate(); |
| final double t2 = p2.getSquashedCostEstimate(); |
| return t1 < t2 ? p1 : p2; |
| }) |
| .orElseThrow(() -> new WayangException("Could not find an execution plan.")); |
| this.logger.info("Picked {} as best plan.", bestPlanImplementation); |
| return this.planImplementation = bestPlanImplementation; |
| } |
| |
| /** |
| * Go over the given {@link WayangPlan} and update the cardinalities of data being passed between its |
| * {@link Operator}s using the given {@link ExecutionState}. |
| * |
| * @return whether any cardinalities have been updated |
| */ |
| private boolean reestimateCardinalities(ExecutionState executionState) { |
| return this.cardinalityEstimatorManager.pushCardinalityUpdates(executionState, this.planImplementation); |
| } |
| |
| /** |
| * Creates a new {@link PlanEnumerator} for the {@link #wayangPlan} and {@link #configuration}. |
| */ |
| private PlanEnumerator createPlanEnumerator() { |
| return this.createPlanEnumerator(null, null); |
| } |
| |
| /** |
| * Creates a new {@link PlanEnumerator} for the {@link #wayangPlan} and {@link #configuration}. |
| */ |
| private PlanEnumerator createPlanEnumerator(ExecutionPlan existingPlan, Set<Channel> openChannels) { |
| return existingPlan == null ? |
| new PlanEnumerator(this.wayangPlan, this.optimizationContext) : |
| new PlanEnumerator(this.wayangPlan, this.optimizationContext, existingPlan, openChannels); |
| } |
| |
| /** |
| * Start executing the given {@link ExecutionPlan} with all bells and whistles, such as instrumentation, |
| * logging of the plan, and measuring the execution time. |
| * |
| * @param executionPlan that should be executed |
| * @param executionId an identifier for the current execution |
| * @return whether the execution of the {@link ExecutionPlan} is completed |
| */ |
| private boolean execute(ExecutionPlan executionPlan, int executionId) { |
| final TimeMeasurement currentExecutionRound = this.executionRound.start(String.format("Execution %d", executionId)); |
| |
| // Ensure existence of the #crossPlatformExecutor. |
| if (this.crossPlatformExecutor == null) { |
| final InstrumentationStrategy instrumentation = this.configuration.getInstrumentationStrategyProvider().provide(); |
| this.crossPlatformExecutor = new CrossPlatformExecutor(this, instrumentation); |
| } |
| |
| if (this.configuration.getOptionalBooleanProperty("wayang.core.debug.skipexecution").orElse(false)) { |
| return true; |
| } |
| if (this.configuration.getBooleanProperty("wayang.core.optimizer.reoptimize")) { |
| this.setUpBreakpoint(executionPlan, currentExecutionRound); |
| } |
| |
| // Log the current executionPlan. |
| this.logStages(executionPlan); |
| |
| // Trigger the execution. |
| currentExecutionRound.start("Execute"); |
| boolean isExecutionComplete = this.crossPlatformExecutor.executeUntilBreakpoint( |
| executionPlan, this.optimizationContext |
| ); |
| executionRound.stop(); |
| |
| // Return. |
| return isExecutionComplete; |
| } |
| |
| /** |
| * Sets up a {@link Breakpoint} for an {@link ExecutionPlan}. |
| * |
| * @param executionPlan for that the {@link Breakpoint} should be set |
| * @param round {@link TimeMeasurement} to be extended for any interesting time measurements |
| */ |
| private void setUpBreakpoint(ExecutionPlan executionPlan, TimeMeasurement round) { |
| |
| // Set up appropriate Breakpoints. |
| final TimeMeasurement breakpointRound = round.start("Configure Breakpoint"); |
| FixBreakpoint immediateBreakpoint = new FixBreakpoint(); |
| final Set<ExecutionStage> completedStages = this.crossPlatformExecutor.getCompletedStages(); |
| if (completedStages.isEmpty()) { |
| executionPlan.getStartingStages().forEach(immediateBreakpoint::breakAfter); |
| } else { |
| completedStages.stream() |
| .flatMap(stage -> stage.getSuccessors().stream()) |
| .filter(stage -> !completedStages.contains(stage)) |
| .forEach(immediateBreakpoint::breakAfter); |
| } |
| this.crossPlatformExecutor.setBreakpoint(new ConjunctiveBreakpoint( |
| immediateBreakpoint, |
| this.cardinalityBreakpoint, |
| new NoIterationBreakpoint() // Avoid re-optimization inside of loops. |
| )); |
| breakpointRound.stop(); |
| } |
| |
| private void logStages(ExecutionPlan executionPlan) { |
| if (this.logger.isInfoEnabled()) { |
| |
| StringBuilder sb = new StringBuilder(); |
| Set<ExecutionStage> seenStages = new HashSet<>(); |
| Queue<ExecutionStage> stagedStages = new LinkedList<>(executionPlan.getStartingStages()); |
| ExecutionStage nextStage; |
| while ((nextStage = stagedStages.poll()) != null) { |
| sb.append(nextStage).append(":\n"); |
| nextStage.getPlanAsString(sb, "* "); |
| nextStage.getSuccessors().stream() |
| .filter(seenStages::add) |
| .forEach(stagedStages::add); |
| } |
| |
| this.logger.info("Current execution plan:\n{}", executionPlan.toExtensiveString()); |
| } |
| } |
| |
| /** |
| * Injects the cardinalities obtained from {@link Channel} instrumentation, potentially updates the {@link ExecutionPlan} |
| * through re-optimization, and collects measured data. |
| * |
| * @return whether the {@link ExecutionPlan} has been re-optimized |
| */ |
| private boolean postProcess(ExecutionPlan executionPlan, int executionId) { |
| if (this.crossPlatformExecutor.isVetoingPlanChanges()) { |
| this.logger.info("The cross-platform executor is currently not allowing re-optimization."); |
| return false; |
| } |
| |
| final TimeMeasurement round = this.optimizationRound.start(String.format("Post-processing %d", executionId)); |
| |
| round.start("Reestimate Cardinalities&Time"); |
| boolean isCardinalitiesUpdated = this.reestimateCardinalities(this.crossPlatformExecutor); |
| round.stop("Reestimate Cardinalities&Time"); |
| |
| round.start("Update Execution Plan"); |
| if (isCardinalitiesUpdated) { |
| this.logger.info("Re-optimizing execution plan."); |
| this.updateExecutionPlan(executionPlan); |
| } else { |
| this.logger.info("Skipping re-optimization: no new insights on cardinalities."); |
| this.timeEstimates.add(this.timeEstimates.get(this.timeEstimates.size() - 1)); |
| this.costEstimates.add(this.costEstimates.get(this.costEstimates.size() - 1)); |
| |
| } |
| round.stop("Update Execution Plan"); |
| |
| round.stop(); |
| |
| return true; |
| } |
| |
| /** |
| * Enumerate possible execution plans from the given {@link WayangPlan} and determine the (seemingly) best one. |
| */ |
| private void updateExecutionPlan(ExecutionPlan executionPlan) { |
| // Defines the plan that we want to use in the end. |
| // Find and copy the open Channels. |
| final Set<ExecutionStage> completedStages = this.crossPlatformExecutor.getCompletedStages(); |
| final Set<ExecutionTask> completedTasks = completedStages.stream() |
| .flatMap(stage -> stage.getAllTasks().stream()) |
| .collect(Collectors.toSet()); |
| |
| // Find Channels that have yet to be consumed by unexecuted ExecutionTasks and scrap unexecuted bits of the plan. |
| final Set<Channel> openChannels = executionPlan.retain(completedStages); |
| |
| // Enumerate all possible plan. |
| final PlanEnumerator planEnumerator = this.createPlanEnumerator(executionPlan, openChannels); |
| final PlanEnumeration comprehensiveEnumeration = planEnumerator.enumerate(true); |
| final Collection<PlanImplementation> executionPlans = comprehensiveEnumeration.getPlanImplementations(); |
| this.logger.debug("Enumerated {} plans.", executionPlans.size()); |
| for (PlanImplementation planImplementation : executionPlans) { |
| this.logger.debug("Plan with operators: {}", planImplementation.getOperators()); |
| } |
| |
| // Pick an execution plan. |
| // Make sure that an execution plan can be created. |
| this.pickBestExecutionPlan(executionPlans, executionPlan, openChannels, completedStages); |
| this.timeEstimates.add(this.planImplementation.getTimeEstimate()); |
| this.costEstimates.add(this.planImplementation.getCostEstimate()); |
| |
| ExecutionTaskFlow executionTaskFlow = ExecutionTaskFlow.recreateFrom( |
| planImplementation, executionPlan, openChannels, completedStages |
| ); |
| final ExecutionPlan executionPlanExpansion = ExecutionPlan.createFrom(executionTaskFlow, this.stageSplittingCriterion); |
| executionPlan.expand(executionPlanExpansion); |
| |
| this.planImplementation.mergeJunctionOptimizationContexts(); |
| |
| assert executionPlan.isSane(); |
| } |
| |
| /** |
| * Asks this instance to release its critical resources to avoid resource leaks and to enhance durability and |
| * consistency of accessed resources. |
| */ |
| private void releaseResources() { |
| this.wayangContext.getCardinalityRepository().sleep(); |
| if (this.crossPlatformExecutor != null) this.crossPlatformExecutor.shutdown(); |
| } |
| |
| private void logExecution() { |
| this.stopWatch.start("Post-processing", "Log measurements"); |
| |
| // For the last time, update the cardinalities and store them. |
| this.reestimateCardinalities(this.crossPlatformExecutor); |
| final CardinalityRepository cardinalityRepository = this.wayangContext.getCardinalityRepository(); |
| cardinalityRepository.storeAll(this.crossPlatformExecutor, this.optimizationContext); |
| |
| // Execution times. |
| final Collection<PartialExecution> partialExecutions = this.crossPlatformExecutor.getPartialExecutions(); |
| |
| // Add the execution times to the experiment. |
| int nextPartialExecutionMeasurementId = 0; |
| for (PartialExecution partialExecution : partialExecutions) { |
| if (this.logger.isDebugEnabled()) { |
| for (AtomicExecutionGroup atomicExecutionGroup : partialExecution.getAtomicExecutionGroups()) { |
| if (!(atomicExecutionGroup.getEstimationContext() instanceof OptimizationContext.OperatorContext)) { |
| continue; |
| } |
| OptimizationContext.OperatorContext operatorContext = |
| (OptimizationContext.OperatorContext) atomicExecutionGroup.getEstimationContext(); |
| |
| for (CardinalityEstimate cardinality : operatorContext.getInputCardinalities()) { |
| if (cardinality != null && !cardinality.isExact()) { |
| this.logger.debug( |
| "Inexact input cardinality estimate {} for {}.", |
| cardinality, operatorContext.getOperator() |
| ); |
| } |
| } |
| for (CardinalityEstimate cardinality : operatorContext.getOutputCardinalities()) { |
| if (cardinality != null && !cardinality.isExact()) { |
| this.logger.debug( |
| "Inexact output cardinality estimate {} for {}.", |
| cardinality, operatorContext.getOperator() |
| ); |
| } |
| } |
| } |
| } |
| String id = String.format("par-ex-%03d", nextPartialExecutionMeasurementId++); |
| final PartialExecutionMeasurement measurement = new PartialExecutionMeasurement(id, partialExecution, this.configuration); |
| this.experiment.addMeasurement(measurement); |
| } |
| |
| // Feed the execution log. |
| try (ExecutionLog executionLog = ExecutionLog.open(this.configuration)) { |
| executionLog.storeAll(partialExecutions); |
| } catch (Exception e) { |
| this.logger.error("Storing partial executions failed.", e); |
| } |
| this.optimizationRound.stop("Post-processing", "Log measurements"); |
| |
| // Log the execution time. |
| long effectiveExecutionMillis = partialExecutions.stream() |
| .map(PartialExecution::getMeasuredExecutionTime) |
| .reduce(0L, (a, b) -> a + b); |
| long measuredExecutionMillis = this.executionRound.getMillis(); |
| this.logger.info( |
| "Accumulated execution time: {} (effective: {}, overhead: {})", |
| Formats.formatDuration(measuredExecutionMillis, true), |
| Formats.formatDuration(effectiveExecutionMillis, true), |
| Formats.formatDuration(measuredExecutionMillis - effectiveExecutionMillis, true) |
| ); |
| int i = 1; |
| for (TimeEstimate timeEstimate : this.timeEstimates) { |
| this.logger.info("Estimated execution time (plan {}): {}", i, timeEstimate); |
| TimeMeasurement lowerEstimate = new TimeMeasurement(String.format("Estimate %d (lower)", i)); |
| lowerEstimate.setMillis(timeEstimate.getLowerEstimate()); |
| this.stopWatch.getExperiment().addMeasurement(lowerEstimate); |
| TimeMeasurement upperEstimate = new TimeMeasurement(String.format("Estimate %d (upper)", i)); |
| upperEstimate.setMillis(timeEstimate.getUpperEstimate()); |
| this.stopWatch.getExperiment().addMeasurement(upperEstimate); |
| i++; |
| } |
| |
| // Log the cost settings. |
| final Collection<Platform> consideredPlatforms = this.configuration.getPlatformProvider().provideAll(); |
| for (Platform consideredPlatform : consideredPlatforms) { |
| final TimeToCostConverter timeToCostConverter = this.configuration |
| .getTimeToCostConverterProvider() |
| .provideFor(consideredPlatform); |
| this.experiment.getSubject().addConfiguration( |
| String.format("Costs per ms (%s)", consideredPlatform.getName()), |
| timeToCostConverter.getCostsPerMillisecond() |
| ); |
| this.experiment.getSubject().addConfiguration( |
| String.format("Fix costs (%s)", consideredPlatform.getName()), |
| timeToCostConverter.getFixCosts() |
| ); |
| } |
| |
| |
| // Log the execution costs. |
| double fixCosts = partialExecutions.stream() |
| .flatMap(partialExecution -> partialExecution.getInvolvedPlatforms().stream()) |
| .map(platform -> this.configuration.getTimeToCostConverterProvider().provideFor(platform).getFixCosts()) |
| .reduce(0d, (a, b) -> a + b); |
| double effectiveLowerCosts = fixCosts + partialExecutions.stream() |
| .map(PartialExecution::getMeasuredLowerCost) |
| .reduce(0d, (a, b) -> a + b); |
| double effectiveUpperCosts = fixCosts + partialExecutions.stream() |
| .map(PartialExecution::getMeasuredUpperCost) |
| .reduce(0d, (a, b) -> a + b); |
| this.logger.info("Accumulated costs: {} .. {}", |
| String.format("%,.2f", effectiveLowerCosts), |
| String.format("%,.2f", effectiveUpperCosts) |
| ); |
| this.experiment.addMeasurement( |
| new CostMeasurement("Measured cost", effectiveLowerCosts, effectiveUpperCosts, 1d) |
| ); |
| i = 1; |
| for (ProbabilisticDoubleInterval costEstimate : this.costEstimates) { |
| this.logger.info("Estimated costs (plan {}): {}", i, costEstimate); |
| this.experiment.addMeasurement(new CostMeasurement( |
| String.format("Estimated costs (%d)", i), |
| costEstimate.getLowerEstimate(), |
| costEstimate.getUpperEstimate(), |
| costEstimate.getCorrectnessProbability() |
| )); |
| i++; |
| } |
| |
| // Log some plan metrics. |
| final PlanMetrics planMetrics = PlanMetrics.createFor(this.wayangPlan, "Plan Metrics"); |
| this.logger.info("Plan metrics: {} virtual operators, {} execution operators, {} alternatives, {} combinations", |
| planMetrics.getNumVirtualOperators(), |
| planMetrics.getNumExecutionOperators(), |
| planMetrics.getNumAlternatives(), |
| planMetrics.getNumCombinations() |
| ); |
| this.experiment.addMeasurement(planMetrics); |
| } |
| |
| /** |
| * Whether a {@link Breakpoint} is requested for the given {@link OutputSlot}. |
| * |
| * @param output the {@link OutputSlot} |
| * @param operatorContext the {@link OptimizationContext.OperatorContext} for the {@link OutputSlot} owner |
| * @return whether a {@link Breakpoint} is requested |
| */ |
| public boolean isRequestBreakpointFor(OutputSlot<?> output, OptimizationContext.OperatorContext operatorContext) { |
| return this.isProactiveReoptimization |
| && output.getOwner().getInnermostLoop() == null |
| && this.cardinalityBreakpoint != null |
| && !this.cardinalityBreakpoint.approves(operatorContext.getOutputCardinality(output.getIndex()) |
| ); |
| } |
| |
| /** |
| * Modify the {@link Configuration} to control the {@link Job} execution. |
| */ |
| public Configuration getConfiguration() { |
| return this.configuration; |
| } |
| |
| public Set<String> getUdfJarPaths() { |
| return this.udfJarPaths; |
| } |
| |
| /** |
| * Provide the {@link CrossPlatformExecutor} used during the execution of this instance. |
| * |
| * @return the {@link CrossPlatformExecutor} or {@code null} if there is none allocated |
| */ |
| public CrossPlatformExecutor getCrossPlatformExecutor() { |
| return this.crossPlatformExecutor; |
| } |
| |
| public DefaultOptimizationContext getOptimizationContext() { |
| return this.optimizationContext; |
| } |
| |
| /** |
| * Retrieves the name of this instance. |
| * |
| * @return the name |
| */ |
| public String getName() { |
| return this.name; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s[%s]", this.getClass().getSimpleName(), this.name); |
| } |
| |
| /** |
| * Provide the {@link Experiment} being recorded with the execution of this instance. |
| * |
| * @return the {@link Experiment} |
| */ |
| public Experiment getExperiment() { |
| return this.experiment; |
| } |
| |
| /** |
| * Provide the {@link WayangPlan} executed by this instance. |
| * |
| * @return the {@link WayangPlan} |
| */ |
| public WayangPlan getWayangPlan() { |
| return this.wayangPlan; |
| } |
| |
| /** |
| * Provide the {@link StopWatch} that is used to instrument the execution of this instance. |
| * |
| * @return the {@link StopWatch} |
| */ |
| public StopWatch getStopWatch() { |
| return this.stopWatch; |
| } |
| |
| /** |
| * Provides a general-purpose cache. Can be used to communicate job-global information. |
| * |
| * @return the cache |
| */ |
| public Map<String, Object> getCache() { |
| return this.cache; |
| } |
| } |