blob: 8c96ac0166ed4fee0f58d399c17bdf65a17d87a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.platform;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionPlan;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionStageLoop;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.LoopHeadOperator;
import org.apache.wayang.core.plan.wayangplan.LoopSubplan;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.profiling.InstrumentationStrategy;
import org.apache.wayang.core.util.AbstractReferenceCountable;
import org.apache.wayang.core.util.Formats;
;
/**
* Executes a (cross-platform) {@link ExecutionPlan}.
*/
public class CrossPlatformExecutor implements ExecutionState {
public final Logger logger = LogManager.getLogger(this.getClass());
/**
* The {@link Job} that is being executed by this instance.
*/
private final Job job;
/**
* Aggregates user-defined {@link Breakpoint}s. Will be cleared after each execution.
*/
private Breakpoint breakpoint = Breakpoint.NONE;
/**
* All {@link ExecutionStage}s in the processed {@link ExecutionPlan}.
*/
private final Set<ExecutionStage> allStages = new HashSet<>();
/**
* Activated and considered for execution.
*/
private final Queue<StageActivator> activatedStageActivators = new LinkedList<>();
/**
* Keeps track of {@link StageActivator}s.
*/
private final Map<ExecutionStage, StageActivator> pendingStageActivators = new HashMap<>();
/**
* Maintains the {@link Executor}s for each {@link Platform}.
*/
private final Map<Platform, Executor> executors = new HashMap<>();
/**
* We keep them around if we want to go on without re-optimization.
*/
private final Collection<StageActivator> suspendedStages = new LinkedList<>();
/**
* When executing an {@link ExecutionStageLoop}, we might need to reuse several {@link ExecutionResource}s
* among all iterations. If we would go with our normal handling scheme, we might lose them after the first
* iteration. Therefore, we actively keep track of them via {@link ExecutionStageLoopContext}s.
*/
private final Map<ExecutionStageLoop, ExecutionStageLoopContext> loopContexts = new HashMap<>();
/**
* Marks {@link Channel}s for instrumentation.
*/
private final InstrumentationStrategy instrumentationStrategy;
/**
* Keeps track of {@link ExecutionStage}s that have actually been executed by this instance.
*/
private Set<ExecutionStage> completedStages = new HashSet<>();
/**
* Keeps track of {@link ChannelInstance} cardinalities.
*/
private final Collection<ChannelInstance> cardinalityMeasurements = new LinkedList<>();
/**
* Maintains {@link ExecutionResource}s that are "global" w.r.t. to this instance, i.e., they will not be
* instantly disposed if not currently used.
*/
private final Set<ExecutionResource> globalResources = new HashSet<>(2);
/**
* Keeps track of {@link ChannelInstance}s so as to reuse them among {@link Executor} runs.
*/
private final Map<Channel, ChannelInstance> channelInstances = new HashMap<>();
/**
* Gathers {@link PartialExecution}s created during the execution.
*/
private final Collection<PartialExecution> partialExecutions = new LinkedList<>();
/**
* Gathers {@link ParallelExecutionThread}s created during parallel execution.
*/
private final ArrayList<Thread> parallelExecutionThreads = new ArrayList<>();
/**
* Keeps track of the completed {@link ParallelExecutionThread}s created during parallel execution.
*/
private volatile int completedThreads;
public CrossPlatformExecutor(Job job, InstrumentationStrategy instrumentationStrategy) {
this.job = job;
this.instrumentationStrategy = instrumentationStrategy;
}
/**
* Execute the given {@link ExecutionPlan}.
*
* @param executionPlan that should be executed or continued
* @return whether the {@link ExecutionPlan} was completed (i.e., not suspended due to the {@link Breakpoint})
*/
public boolean executeUntilBreakpoint(ExecutionPlan executionPlan, OptimizationContext optimizationContext) {
// Initialize this instance from the executionPlan.
this.prepare(executionPlan, optimizationContext);
// Run until the #breakpoint inhibits or no ExecutionStages are left.
this.runToBreakpoint();
return this.suspendedStages.isEmpty();
}
/**
* Clean up {@link ExecutionStage}-related state and re-create {@link StageActivator}s etc. from the {@link ExecutionPlan}/
*
* @param executionPlan whose {@link ExecutionStage}s will be executed
* @param optimizationContext contains additional optimization info for the {@code executionPlan}
*/
public void prepare(ExecutionPlan executionPlan, OptimizationContext optimizationContext) {
this.allStages.clear();
this.activatedStageActivators.clear();
this.suspendedStages.clear();
// Remove obsolete StageActivators (after re-optimization).
this.allStages.addAll(executionPlan.getStages());
new ArrayList<>(this.pendingStageActivators.keySet()).stream()
.filter(stage -> !this.allStages.contains(stage))
.forEach(this.pendingStageActivators::remove);
// Create StageActivators for all ExecutionStages.
for (ExecutionStage stage : this.allStages) {
// Avoid re-activating already executed ExecutionStages.
if (this.completedStages.contains(stage)) continue;
final StageActivator activator = this.getOrCreateActivator(
stage,
() -> this.determineInitialOptimizationContext(stage, optimizationContext)
);
this.tryToActivate(activator);
}
}
/**
* Find the initial {@link OptimizationContext} for a {@link StageActivator}.
*
* @param stage the {@link ExecutionStage} whose {@link OptimizationContext} is to be determined
* @return the {@link OptimizationContext}
*/
private OptimizationContext determineInitialOptimizationContext(ExecutionStage stage, OptimizationContext rootOptimizationContext) {
if (stage.getLoop() == null) return rootOptimizationContext;
// TODO: Assumes non-nested loops.
return rootOptimizationContext.getNestedLoopContext(stage.getLoop().getLoopSubplan()).getInitialIterationContext();
}
/**
* Returns an existing {@link StageActivator} for the {@link ExecutionStage} or creates and registers a new one.
*
* @param stage for which the {@link StageActivator} is requested
* @param optimizationContextSupplier supplies an {@link OptimizationContext} for the {@code stage}
* @return the {@link StageActivator}
*/
private StageActivator getOrCreateActivator(
ExecutionStage stage,
final Supplier<OptimizationContext> optimizationContextSupplier
) {
return this.pendingStageActivators.computeIfAbsent(stage, s -> new StageActivator(s, optimizationContextSupplier.get()));
}
/**
* If the {@link StageActivator} can be activate, move it from {@link #pendingStageActivators} to {@link #activatedStageActivators}.
*
* @param activator that should be activated
* @return whether an activation took place
*/
private boolean tryToActivate(StageActivator activator) {
if (activator.updateInputChannelInstances()) {
logger.info("Activating {}.", activator.getStage());
// Activate the activator by moving it.
this.pendingStageActivators.remove(activator.getStage());
assert this.activatedStageActivators.stream().noneMatch(a -> a.getStage().equals(activator.getStage())) :
String.format("Must not activate %s twice.", activator.getStage());
this.activatedStageActivators.add(activator);
activator.noteActivation();
return true;
}
return false;
}
/**
* Execute one single {@link ExecutionStage}
*/
private void executeSingleStage(boolean isBreakpointsDisabled, StageActivator stageActivator) {
// Check if #breakpoint permits the execution.
if (!isBreakpointsDisabled && this.suspendIfBreakpointRequest(stageActivator)) {
return;
}
// Otherwise, execute the stage.
this.execute(stageActivator);
// Try to activate the successor stages.
this.tryToActivateSuccessors(stageActivator);
// We can now dispose the stageActivator that collected the input ChannelInstances.
stageActivator.dispose();
// Dispose obsolete ChannelInstances.
final Iterator<Map.Entry<Channel, ChannelInstance>> iterator = this.channelInstances.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Channel, ChannelInstance> channelInstanceEntry = iterator.next();
final ChannelInstance channelInstance = channelInstanceEntry.getValue();
// If this is instance is the only one to still use this ChannelInstance, discard it.
if (channelInstance.getNumReferences() == 1) {
channelInstance.noteDiscardedReference(true);
iterator.remove();
}
}
}
/**
* Run parallel threads executing activated {@link ExecutionStage}s
*/
private void runParallelExecution(boolean isBreakpointsDisabled) {
int numActiveStages = this.activatedStageActivators.size();
// Create execution threads
for (int i = 1; i <= numActiveStages; ++i) {
// TODO: Better pass the stage to the thread rather than letting the thread retrieve the stage itself (to avoid concurrency issues).
Thread thread = new Thread(new ParallelExecutionThread(isBreakpointsDisabled, "T" + String.valueOf(i), this));
// Start thread execution
thread.start();
this.parallelExecutionThreads.add(thread);
}
//Join all created threads
for (Thread t : this.parallelExecutionThreads) {
try {
t.join();
} catch (InterruptedException e) {
CrossPlatformExecutor.this.logger.error("Thread Interrupted!", e);
}
}
// Clear the list of created threads
parallelExecutionThreads.clear();
CrossPlatformExecutor.this.logger.info("Parallel execution ended!");
}
/**
* Activate and execute {@link ExecutionStage}s as far as possible.
*/
private void runToBreakpoint() {
// Start execution traversal.
final long startTime = System.currentTimeMillis();
int numPriorExecutedStages = this.completedStages.size();
int numExecutedStages;
boolean isBreakpointsDisabled = false;
do {
// Execute and activate as long as possible.
while (!this.activatedStageActivators.isEmpty()) {
// Check if there is multiple activated stages to start parallelization
if (this.activatedStageActivators.size() > 1 &&
this.getConfiguration().getBooleanProperty("wayang.core.optimizer.enumeration.parallel-tasks")) {
// Run multiple threads for each independant stage
this.runParallelExecution(isBreakpointsDisabled);
} else {
final StageActivator stageActivator = this.activatedStageActivators.poll();
// Execute one single ExecutionStage
this.executeSingleStage(isBreakpointsDisabled, stageActivator);
}
}
// Safety net to recover from illegal Breakpoint configurations.
numExecutedStages = this.completedStages.size() - numPriorExecutedStages;
if (!isBreakpointsDisabled && numExecutedStages == 0) {
this.logger.warn("Could not execute a single stage. Will retry with disabled breakpoints.");
isBreakpointsDisabled = true;
this.activatedStageActivators.addAll(this.suspendedStages);
this.suspendedStages.clear();
} else {
isBreakpointsDisabled = false;
}
} while (!this.activatedStageActivators.isEmpty());
// Get the number of executed stages in current runToBreakpoint
final long finishTime = System.currentTimeMillis();
CrossPlatformExecutor.this.logger.info("Executed {} stages in {}.",
numExecutedStages, Formats.formatDuration(finishTime - startTime, true));
assert numExecutedStages > 0 : "Did not execute a single stage.";
}
/**
* If the {@link #breakpoint} requests not to execute the given {@link ExecutionStage}, put it to
* {@link #suspendedStages}.
*
* @param stageActivator that might be suspended
* @return whether the {@link ExecutionStage} was suspended
*/
private boolean suspendIfBreakpointRequest(StageActivator stageActivator) {
if (!this.breakpoint.permitsExecutionOf(stageActivator.getStage(), this, this.job.getOptimizationContext())) {
this.suspendedStages.add(stageActivator);
return true;
}
return false;
}
/**
* Tries to execute the given {@link ExecutionStage}.
*
* @param stageActivator that should be executed
* @return whether the {@link ExecutionStage} was really executed
*/
private void execute(StageActivator stageActivator) {
final ExecutionStage stage = stageActivator.getStage();
final OptimizationContext optimizationContext = stageActivator.getOptimizationContext();
// Find parts of the stage to instrument.
this.instrumentationStrategy.applyTo(stage);
// Obtain an Executor for the stage.
Executor executor = this.getOrCreateExecutorFor(stage);
// Have the execution done.
CrossPlatformExecutor.this.logger.info("Having {} execute {}:\n{}", executor, stage, stage.getPlanAsString("> "));
long startTime = System.currentTimeMillis();
executor.execute(stage, optimizationContext, this);
long finishTime = System.currentTimeMillis();
CrossPlatformExecutor.this.logger.info("Executed {} in {}.", stage, Formats.formatDuration(finishTime - startTime, true));
// Remember that we have executed the stage.
this.completedStages.add(stage);
if (stage.isLoopHead()) {
this.getOrCreateLoopContext(stage.getLoop()).scrapPreviousTransitionContext();
}
}
private Executor getOrCreateExecutorFor(ExecutionStage stage) {
return this.executors.computeIfAbsent(
stage.getPlatformExecution().getPlatform(),
platform -> {
// It is important to register the Executor. This way, we ensure that it will also not be disposed
// among disconnected PlatformExecutions. The downside is, that we only remove it, once the
// execution is done.
final Executor executor = platform.getExecutorFactory().create(this.job);
this.registerGlobal(executor);
return executor;
}
);
}
/**
* Follows the outbound {@link Channel}s of the given {@link ExecutionStage} and try to activate consuming
* {@link ExecutionStage}s, given the according {@link ChannelInstance}s are available.
*
* @param processedStageActivator should have just been executed
*/
private void tryToActivateSuccessors(StageActivator processedStageActivator) {
final ExecutionStage processedStage = processedStageActivator.getStage();
// Gather all successor ExecutionStages for that a new ChannelInstance has been produced.
final Collection<Channel> outboundChannels = processedStage.getOutboundChannels();
Set<ExecutionStage> successorStages = new HashSet<>(outboundChannels.size());
for (Channel outboundChannel : outboundChannels) {
for (ExecutionTask consumer : outboundChannel.getConsumers()) {
if (this.getChannelInstance(outboundChannel, consumer.isFeedbackInput(outboundChannel)) != null) {
final ExecutionStage consumerStage = consumer.getStage();
// We must be careful: outbound Channels still can have consumers within the producer's ExecutionStage.
if (consumerStage != processedStage && !consumerStage.isInFinishedLoop()) {
successorStages.add(consumerStage);
}
}
}
}
// Try to activate follow-up stages.
for (ExecutionStage successorStage : successorStages) {
final StageActivator activator = this.getOrCreateActivator(
successorStage,
() -> this.determineNextOptimizationContext(processedStageActivator, successorStage)
);
this.tryToActivate(activator);
}
}
/**
* Find the {@link OptimizationContext} for a {@link StageActivator} that is activated from a preceeding
* {@link StageActivator}.
*
* @param processedStageActivator the preceeding {@link StageActivator}
* @param successorStage the {@link StageActivator} whose {@link OptimizationContext} is to be determined
* @return the {@link OptimizationContext}
*/
private OptimizationContext determineNextOptimizationContext(
StageActivator processedStageActivator,
ExecutionStage successorStage
) {
final OptimizationContext prevOptimizationContext = processedStageActivator.getOptimizationContext();
// If the sucessor is not in a loop, we go to the root OptimizationContext.
if (successorStage.getLoop() == null) {
return prevOptimizationContext.getRootParent();
}
final ExecutionStage processedStage = processedStageActivator.getStage();
if (processedStage.getLoop() == successorStage.getLoop()) {
// If we stay in the very same loop...
if (successorStage.isLoopHead()) {
// ...and the next stage is a loop head, then we need to switch to the next iteration context.
return prevOptimizationContext.getNextIterationContext();
} else {
// We need to check, that we do not run into the last iteration context, where there is no more
// information for the loop body.
if (!prevOptimizationContext.isFinalIteration()) {
return prevOptimizationContext;
}
// Sneak in a new OptimizationContext for the new iteration.
return prevOptimizationContext.getLoopContext().appendIterationContext();
}
}
// Otherwise, we enter a loop.
// TODO: This code assumes non-nested loops.
final LoopSubplan loopSubplan = successorStage.getLoop().getLoopSubplan();
return prevOptimizationContext
.getRootParent()
.getNestedLoopContext(loopSubplan)
.getInitialIterationContext();
}
/**
* Retrieve an existing {@link ExecutionStageLoopContext} or create a new one for a {@link ExecutionStageLoop}.
*
* @param loop for that a {@link ExecutionStageLoopContext} is requested
* @return the {@link ExecutionStageLoopContext}
*/
private ExecutionStageLoopContext getOrCreateLoopContext(ExecutionStageLoop loop) {
return this.loopContexts.computeIfAbsent(loop, ExecutionStageLoopContext::new);
}
/**
* Removes an {@link ExecutionStageLoopContext}.
*
* @param loop that is described by the {@link ExecutionStageLoopContext}
*/
private void removeLoopContext(ExecutionStageLoop loop) {
final ExecutionStageLoopContext context = this.loopContexts.remove(loop);
assert context.getNumReferences() == 0;
}
@Override
public ChannelInstance getChannelInstance(Channel channel) {
return this.getChannelInstance(channel, false);
}
public ChannelInstance getChannelInstance(Channel channel, boolean isPeekingToNextTransition) {
final ExecutionStageLoop loop = getExecutionStageLoop(channel);
if (loop == null) {
return this.channelInstances.get(channel);
} else {
final ExecutionStageLoopContext loopContext = this.getOrCreateLoopContext(loop);
return loopContext.getChannelInstance(channel, isPeekingToNextTransition);
}
}
/**
* Determine the {@link ExecutionStageLoop} the given {@link Channel} belongs to.
*
* @param channel the {@link Channel}
* @return the {@link ExecutionStageLoop} or {@code null} if none
*/
private static ExecutionStageLoop getExecutionStageLoop(Channel channel) {
final ExecutionStage producerStage = channel.getProducer().getStage();
if (producerStage.getLoop() == null) return null;
final OutputSlot<?> output = channel.getProducer().getOutputSlotFor(channel);
if (output != null
&& output.getOwner().isLoopHead()
&& ((LoopHeadOperator) output.getOwner()).getFinalLoopOutputs().contains(output)) {
return null;
}
return producerStage.getLoop();
}
@Override
public void register(ChannelInstance channelInstance) {
final ExecutionStageLoop loop = getExecutionStageLoop(channelInstance.getChannel());
if (loop == null) {
final ChannelInstance oldChannelInstance = this.channelInstances.put(channelInstance.getChannel(), channelInstance);
channelInstance.noteObtainedReference();
if (oldChannelInstance != null) {
oldChannelInstance.noteDiscardedReference(true);
}
} else {
final ExecutionStageLoopContext loopContext = this.getOrCreateLoopContext(loop);
loopContext.register(channelInstance);
}
}
/**
* Register a global {@link ExecutionResource}, that will only be released once this instance has
* finished executing.
*
* @param resource that should be registered
*/
public void registerGlobal(ExecutionResource resource) {
if (this.globalResources.add(resource)) {
resource.noteObtainedReference();
} else {
this.logger.warn("Registered {} twice.", resource);
}
}
@Override
public void addCardinalityMeasurement(ChannelInstance channelInstance) {
this.cardinalityMeasurements.add(channelInstance);
}
@Override
public Collection<ChannelInstance> getCardinalityMeasurements() {
return this.cardinalityMeasurements;
}
@Override
public void add(PartialExecution partialExecution) {
this.partialExecutions.add(partialExecution);
if (this.logger.isInfoEnabled()) {
this.logger.info(
"Executed {} items in {} (estimated {}).",
partialExecution.getAtomicExecutionGroups().size(),
Formats.formatDuration(partialExecution.getMeasuredExecutionTime()),
partialExecution.getOverallTimeEstimate(this.getConfiguration())
);
}
}
@Override
public Collection<PartialExecution> getPartialExecutions() {
return this.partialExecutions;
}
/**
* Set a new {@link Breakpoint} for this instance.
*
* @param breakpoint the new {@link Breakpoint}
*/
public void setBreakpoint(Breakpoint breakpoint) {
this.breakpoint = breakpoint;
}
/**
* Allows to inhibit changes to the {@link ExecutionPlan}, such as on re-optimization.
*
* @return whether this instance is vetoing on changes
*/
public boolean isVetoingPlanChanges() {
return !this.loopContexts.isEmpty();
}
public void shutdown() {
// Release global resources.
this.globalResources.forEach(resource -> resource.noteDiscardedReference(true));
this.globalResources.clear();
}
/**
* Retrieve the {@link ExecutionStage}s that have been completed so far.
*
* @return the completed {@link ExecutionStage}s
*/
public Set<ExecutionStage> getCompletedStages() {
return this.completedStages;
}
public Configuration getConfiguration() {
return this.job.getConfiguration();
}
/**
* Observes the {@link CrossPlatformExecutor} execution state in order to tell when the input dependencies of
* a {@link ExecutionStage} are satisfied so that it can be activated.
*/
private class StageActivator {
/**
* The {@link ExecutionStage} being activated.
*/
private final ExecutionStage stage;
/**
* The {@link OptimizationContext} for the {@link #stage}.
*/
private final OptimizationContext optimizationContext;
/**
* Regular inbound {@link Channel}s to the {@link #stage}.
*/
private final Collection<Channel> miscInboundChannels = new LinkedList<>();
/**
* Inbound {@link Channel}s to the {@link #stage}, that implement an initialization {@link InputSlot}
* of a {@link LoopHeadOperator}.
*/
private final Collection<Channel> initializationInboundChannels = new LinkedList<>();
/**
* Inbound {@link Channel}s to the {@link #stage}, that implement an iteration {@link InputSlot}
* of a {@link LoopHeadOperator}.
*/
private final Collection<Channel> iterationInboundChannels = new LinkedList<>();
/**
* Inbound {@link Channel}s to the {@link #stage}, that are loop invariant, i.e., that are needed across
* several iteration. Of course, this is only possible if {@link #stage} is in an {@link ExecutionStageLoop}.
*/
private final Collection<Channel> loopInvariantInboundChannels = new LinkedList<>();
/**
* If the {@link #stage} is part of a {@link ExecutionStageLoop}, we need to keep track of an associated
* runtime {@link ExecutionStageLoopContext} that manages loop invariant {@link ExecutionResource}s. In particular,
* this instance is a stake-holder of this {@link ExecutionStageLoopContext} and tells that it must not be
* removed.
*/
private final ExecutionStageLoopContext loopContext;
/**
* Keep track of the {@link ChannelInstance}s that are inputs of the {@link #stage}.
*/
private final Map<Channel, ChannelInstance> inputChannelInstances = new HashMap<>(4);
/**
* Creates a new instance.
*
* @param stage that should be activated
* @param optimizationContext
*/
private StageActivator(ExecutionStage stage, OptimizationContext optimizationContext) {
this.stage = stage;
this.optimizationContext = optimizationContext;
if (this.stage.getLoop() != null) {
this.loopContext = CrossPlatformExecutor.this.getOrCreateLoopContext(this.stage.getLoop());
this.loopContext.noteObtainedReference();
} else {
this.loopContext = null;
}
// Distinguish the inbound Channels of the stage.
final Collection<Channel> inboundChannels = this.stage.getInboundChannels();
if (this.stage.isLoopHead()) {
assert this.stage.getAllTasks().size() == 1 : String.format("Loop head stage looks like this:\n%s", this.stage.getPlanAsString("! "));
// Loop heads are special in the sense that they don't require all of their inputs.
for (Channel inboundChannel : inboundChannels) {
for (ExecutionTask executionTask : inboundChannel.getConsumers()) {
// Avoid consumers from other ExecutionStages.
if (executionTask.getStage() != this.stage) continue;
// Check special inputs with iteration semantics.
if (executionTask.getOperator().isLoopHead()) {
LoopHeadOperator loopHead = (LoopHeadOperator) executionTask.getOperator();
final InputSlot<?> targetInput = executionTask.getInputSlotFor(inboundChannel);
if (loopHead.getLoopBodyInputs().contains(targetInput)) {
this.iterationInboundChannels.add(inboundChannel);
continue;
} else if (loopHead.getLoopInitializationInputs().contains(targetInput)) {
this.initializationInboundChannels.add(inboundChannel);
continue;
}
}
// Otherwise, we treat it as a regular inbound Channel.
this.miscInboundChannels.add(inboundChannel);
assert this.checkIfIsLoopInput(inboundChannel) :
String.format("%s is not a loop input as expected.", inboundChannel);
this.loopInvariantInboundChannels.add(inboundChannel);
}
}
} else {
// If we do not have a loop head, we treat all Channels as regular ones.
this.miscInboundChannels.addAll(inboundChannels);
// Still, the Channels might be loop inputs.
for (Channel inboundChannel : inboundChannels) {
if (this.checkIfIsLoopInput(inboundChannel)) {
this.loopInvariantInboundChannels.add(inboundChannel);
}
}
}
}
/**
* Checks if the given {@link Channel} (inbound to {@link #stage}), is entering an {@link ExecutionStageLoop}
* when serving the {@link #stage}.
*
* @param inboundChannel {@link Channel} that is inbound to {@link #stage} and that might be a {@link ExecutionStageLoop} input
* @return whether the {@link Channel} is a {@link ExecutionStageLoop} input, i.e., it is not produced in an {@link ExecutionStageLoop}
* while this {@link #stage} is part of a {@link ExecutionStageLoop}
*/
private boolean checkIfIsLoopInput(Channel inboundChannel) {
// NB: This code assumes no nested loops.
return this.stage.getLoop() != null && this.stage.getLoop() != inboundChannel.getProducer().getStage().getLoop();
}
/**
* Try to satisfy the input {@link ChannelInstance} requirements by updating {@link #inputChannelInstances}.
*
* @return whether the activation is possible
*/
boolean updateInputChannelInstances() {
boolean isMiscChannelsReady = this.updateChannelInstances(this.miscInboundChannels, false);
boolean isLoopChannelsReady = true;
if (this.stage.isLoopHead()) {
LoopHeadOperator loopOperator = (LoopHeadOperator) this.stage.getLoopHeadTask().getOperator();
switch (loopOperator.getState()) {
case NOT_STARTED:
isLoopChannelsReady = this.updateChannelInstances(this.initializationInboundChannels, false);
break;
case RUNNING:
isLoopChannelsReady = this.updateChannelInstances(this.iterationInboundChannels, true);
break;
default:
logger.warn("Tried to update input channel instances for finished {}.", this.stage);
isLoopChannelsReady = false;
}
}
return isMiscChannelsReady && isLoopChannelsReady;
}
/**
* Try to satisfy the input {@link ChannelInstance} requirements for all given {@link Channel}s by updating {@link #inputChannelInstances}.
*
* @param channels for that the {@link ChannelInstance}s are requested
* @return whether there are {@link ChannelInstance}s for all {@link Channel}s available
*/
private boolean updateChannelInstances(Collection<Channel> channels, boolean isFeedback) {
boolean isAllChannelsAvailable = true;
for (Channel channel : channels) {
// Check if the ChannelInstance is already known.
if (this.inputChannelInstances.containsKey(channel)) continue;
// Otherwise, check if it is available now.
final ChannelInstance channelInstance = CrossPlatformExecutor.this.getChannelInstance(channel, isFeedback);
if (channelInstance != null) {
// If so, reference it.
this.inputChannelInstances.put(channel, channelInstance);
channelInstance.noteObtainedReference();
// Also, check if this is a loop invariant input.
if (this.loopInvariantInboundChannels.contains(channel)) {
this.loopContext.registerLoopInvariant(channelInstance);
}
} else {
// Otherwise, we will have a negative result. Still, we need to grab all Channels to save them from disposal.
isAllChannelsAvailable = false;
}
}
return isAllChannelsAvailable;
}
/**
* Getter for the {@link ExecutionStage} activated by this instance.
*
* @return the {@link ExecutionStage}
*/
public ExecutionStage getStage() {
return this.stage;
}
public OptimizationContext getOptimizationContext() {
return optimizationContext;
}
/**
* Dispose this instance, in particular discard references to its {@link ChannelInstance}s.
*/
void dispose() {
// Release the inputChannelInstances.
for (ChannelInstance channelInstance : this.inputChannelInstances.values()) {
channelInstance.noteDiscardedReference(true);
}
this.inputChannelInstances.clear();
// Release the loopContext.
if (this.loopContext != null) {
this.loopContext.noteDiscardedReference(true);
}
}
/**
* Notifies this instance that it has been activated.
*/
public void noteActivation() {
if (this.stage.isLoopHead()) this.loopContext.activateNextIteration();
}
}
/**
* Keeps track of {@link ExecutionResource}s of an {@link ExecutionStageLoop}.
*/
private class ExecutionStageLoopContext extends AbstractReferenceCountable {
/**
* The {@link ExecutionStageLoop} being described by this instance.
*/
private final ExecutionStageLoop loop;
/**
* Maintains the loop invariant {@link ExecutionResource}s.
*/
private Set<ExecutionResource> loopInvariants = new HashSet<>(4);
private ExecutionStageLoopIterationContext currentIteration, prevTransition, nextTransition;
/**
* Creates a new instance.
*
* @param executionStageLoop is described by the new instance
*/
public ExecutionStageLoopContext(ExecutionStageLoop executionStageLoop) {
this.loop = executionStageLoop;
}
/**
* Registers a loop invariant {@link ExecutionResource} with this instance.
*
* @param loopInvariant the said {@link ExecutionResource}
*/
void registerLoopInvariant(ExecutionResource loopInvariant) {
if (this.loopInvariants.add(loopInvariant)) {
loopInvariant.noteObtainedReference();
}
}
/**
* Switch the state of this instance: Age the next to the previous transition and create a new
* current {@link ExecutionStageLoopIterationContext}.
*/
public void activateNextIteration() {
logger.info("Activating next iteration.");
if (this.currentIteration != null) this.currentIteration.noteDiscardedReference(true);
this.currentIteration = this.createIterationContext();
if (this.prevTransition != null) this.prevTransition.noteDiscardedReference(true);
this.prevTransition = this.nextTransition;
this.nextTransition = null;
}
private ExecutionStageLoopIterationContext createIterationContext() {
final ExecutionStageLoopIterationContext iteration = new ExecutionStageLoopIterationContext(this);
iteration.noteObtainedReference();
return iteration;
}
/**
* Create a new {@link ExecutionStageLoopIterationContext} for the next transition if it does not exist.
*
* @return the {@link ExecutionStageLoopIterationContext} for the next transition
*/
public ExecutionStageLoopIterationContext getOrCreateNextTransition() {
if (this.nextTransition == null) {
this.nextTransition = this.createIterationContext();
}
return this.nextTransition;
}
@Override
protected void disposeUnreferenced() {
for (ExecutionResource loopInvariant : this.loopInvariants) {
loopInvariant.noteDiscardedReference(true);
}
this.loopInvariants = null;
CrossPlatformExecutor.this.removeLoopContext(this.loop);
if (this.prevTransition != null) this.prevTransition.noteDiscardedReference(true);
if (this.currentIteration != null) this.currentIteration.noteDiscardedReference(true);
if (this.nextTransition != null) this.nextTransition.noteDiscardedReference(true);
}
@Override
public boolean isDisposed() {
return this.loopInvariants == null;
}
/**
* Registers the given {@link ChannelInstance} with appropriate {@link ExecutionStageLoopIterationContext}s.
*
* @param channelInstance the {@link ChannelInstance}
*/
public void register(ChannelInstance channelInstance) {
final Channel channel = channelInstance.getChannel();
boolean isFeedback = false, isIterationLocal = false;
for (ExecutionTask consumer : channel.getConsumers()) {
if (consumer.isFeedbackInput(channel)) {
isFeedback = true;
} else {
isIterationLocal = true;
}
}
if (isIterationLocal) this.currentIteration.register(channelInstance);
if (isFeedback) this.getOrCreateNextTransition().register(channelInstance);
}
/**
* Provide a {@link ChannelInstance} from this instance. The instance can be provided from the previous or
* next transition or the current {@link ExecutionStageLoopIterationContext}.
*
* @param channel whose {@link ChannelInstance} should be provided
* @param isPeekingToNextTransition whether the next transition {@link ExecutionStageLoopIterationContext}
* may be accessed
* @return the {@link ChannelInstance} or {@code null} if it cannot be found
*/
public ChannelInstance getChannelInstance(Channel channel,
boolean isPeekingToNextTransition) {
if (isPeekingToNextTransition) {
return this.getOrCreateNextTransition().getChannelInstance(channel);
}
if (this.prevTransition != null) {
final ChannelInstance channelInstance = this.prevTransition.getChannelInstance(channel);
if (channelInstance != null) return channelInstance;
}
if (this.currentIteration != null) {
return this.currentIteration.getChannelInstance(channel);
}
return null;
}
/**
* Removes the previous transition {@link ExecutionStageLoopIterationContext}. Included resources
* will not be provided anymore by this instance.
*/
public void scrapPreviousTransitionContext() {
if (this.prevTransition != null) this.prevTransition.noteDiscardedReference(true);
this.prevTransition = null;
}
}
/**
* Keeps track of {@link ExecutionResource}s of an {@link ExecutionStageLoop}.
*/
private class ExecutionStageLoopIterationContext extends AbstractReferenceCountable {
/**
* The hosting {@link ExecutionStageLoopContext}.
*/
private final ExecutionStageLoopContext loopContext;
/**
* Maintains {@link ChannelInstance}s produced in this iteration.
*/
private Map<Channel, ChannelInstance> channelInstances = new HashMap<>(8);
/**
* Creates a new instance.
*/
private ExecutionStageLoopIterationContext(ExecutionStageLoopContext loopContext) {
this.loopContext = loopContext;
}
@Override
protected void disposeUnreferenced() {
for (ChannelInstance channelInstance : channelInstances.values()) {
channelInstance.noteDiscardedReference(true);
}
this.channelInstances = null;
}
/**
* Registers the given {@link ChannelInstance} with this instance.
*
* @param channelInstance that should be registered
*/
public void register(ChannelInstance channelInstance) {
channelInstance.noteObtainedReference();
final ChannelInstance oldInstance = this.channelInstances.put(channelInstance.getChannel(), channelInstance);
if (oldInstance != null) oldInstance.noteDiscardedReference(true);
}
public ChannelInstance getChannelInstance(Channel channel) {
return this.channelInstances.get(channel);
}
}
/**
* Executes {@link ExecutionStage}s in parallel threads
* It continues to live as long as there is a {@link ExecutionStage} activated after first {@link ExecutionStage} execution and another running {@link ParallelExecutionThread}s,
* if multiple {@link ExecutionStage} are activated it will create new threads to execute new {@link ExecutionStage} in recursive manner
*/
private class ParallelExecutionThread implements Runnable {
/**
* Thread identifier of {@link ParallelExecutionThread}
*/
public String threadId;
/**
* Check if #breakpoint permits the execution of {@link ExecutionStage}
*/
private boolean thread_isBreakpointDisabled;
/**
* {@link CrossPlatformExecutor} initiating the running thread
*/
private final CrossPlatformExecutor crossPlatformExecutor;
/**
* Creates a new instance.
*/
public ParallelExecutionThread(boolean isBreakpointsDisabled, String id, CrossPlatformExecutor cpe) {
this.thread_isBreakpointDisabled = isBreakpointsDisabled;
this.threadId = id;
this.crossPlatformExecutor = cpe;
}
/**
* Execution code of the thread.
*/
@Override
public void run() {
StageActivator stageActivator;
this.crossPlatformExecutor.logger.info("Thread " + String.valueOf(this.threadId) + " started");
// Loop until there is no activated stage or only one thread running
do {
// Get the stageActivator for the stage to execute
synchronized (this.crossPlatformExecutor) {
stageActivator = this.crossPlatformExecutor.activatedStageActivators.poll();
if (stageActivator == null)
break;
}
this.crossPlatformExecutor.logger.info(this.threadId + " started executing Stage: {}:", stageActivator.getStage());
// Check if #breakpoint permits the execution.
if (!this.thread_isBreakpointDisabled && this.crossPlatformExecutor.suspendIfBreakpointRequest(stageActivator)) {
return;
}
// Otherwise, execute the stage.
final ExecutionStage stage = stageActivator.getStage();
final OptimizationContext optimizationContext = stageActivator.getOptimizationContext();
// Find parts of the stage to instrument.
this.crossPlatformExecutor.instrumentationStrategy.applyTo(stage);
// Obtain an Executor for the stage.
final Executor executor = this.crossPlatformExecutor.getOrCreateExecutorFor(stage);
// Have the execution done.
CrossPlatformExecutor.this.logger.info("Having {} execute {}:\n{}", executor, stage, stage.getPlanAsString("> "));
long startTime = System.currentTimeMillis();
// synchronize(this.crossplateform) can be used here to avoid error when we have two stages running same operators even on the same platform but still with different executors
synchronized (executor) {
executor.execute(stage, optimizationContext, this.crossPlatformExecutor);
long finishTime = System.currentTimeMillis();
CrossPlatformExecutor.this.logger.info("Executed {} in {}.", stage, Formats.formatDuration(finishTime - startTime, true));
// Remember that we have executed the stage.
this.crossPlatformExecutor.completedStages.add(stage);
if (stage.isLoopHead()) {
this.crossPlatformExecutor.getOrCreateLoopContext(stage.getLoop()).scrapPreviousTransitionContext();
}
// Try to activate the successor stages.
this.crossPlatformExecutor.tryToActivateSuccessors(stageActivator);
}
// We can now dispose the stageActivator that collected the input ChannelInstances.
stageActivator.dispose();
// Dispose obsolete ChannelInstances.
final Iterator<Map.Entry<Channel, ChannelInstance>> iterator = this.crossPlatformExecutor.channelInstances.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Channel, ChannelInstance> channelInstanceEntry = iterator.next();
final ChannelInstance channelInstance = channelInstanceEntry.getValue();
// If this is instance is the only one to still use this ChannelInstance, discard it.
if (channelInstance.getNumReferences() == 1) {
channelInstance.noteDiscardedReference(true);
iterator.remove();
}
}
this.crossPlatformExecutor.logger.info(this.threadId + " completed executing Stage : {}:", stageActivator.getStage());
// Create new threads for more than one activated stages recursively
if (CrossPlatformExecutor.this.activatedStageActivators.size() > 1) {
// Create new threads other than the existing thread
for (int i = 1; i <= CrossPlatformExecutor.this.activatedStageActivators.size() - 1; i++) {
// TODO: Better use Java's ForkJoinPool to reduce thread creation overhead and control concurrency.
// Create parallel stage execution thread
Thread thread = new Thread(new ParallelExecutionThread(this.thread_isBreakpointDisabled, "T" + String.valueOf(i) + "@" + this.threadId, this.crossPlatformExecutor));
thread.start();
synchronized (this.crossPlatformExecutor) {
//Add the created thread to {@link #parallelExecutionThreads}
CrossPlatformExecutor.this.parallelExecutionThreads.add(thread);
}
}
}
}
// TODO: Do not busy-wait (could be solved with the ForkJoinPool as well).
while (CrossPlatformExecutor.this.activatedStageActivators.size() >= 1 &&
CrossPlatformExecutor.this.parallelExecutionThreads.size() - CrossPlatformExecutor.this.completedThreads > 1);
// Increment a global variable of completed threads
// As long as the variable is volatile so there is no concern of race condition
CrossPlatformExecutor.this.completedThreads++;
// Notify thread ended
CrossPlatformExecutor.this.logger.info(this.threadId + " ended");
}
}
}