blob: dd49737f832d69621837f5f3d0ed9b795cd9382c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.platform;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionStageLoop;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.AbstractReferenceCountable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Implements the {@link ExecutionResource} handling as defined by {@link Executor}.
*/
public abstract class ExecutorTemplate extends AbstractReferenceCountable implements Executor {
protected final Logger logger = LogManager.getLogger(this.getClass());
/**
* Provides IDs to distinguish instances humanly readable.
*/
private static final AtomicInteger idGenerator = new AtomicInteger(0);
/**
* The {@link CrossPlatformExecutor} that instruments this instance or {@code null} if none
*/
private final CrossPlatformExecutor crossPlatformExecutor;
/**
* Resources being held by this instance.
*/
private final Set<ExecutionResource> registeredResources = new HashSet<>();
/**
* ID of this instance.
*/
private final int id = idGenerator.getAndIncrement();
/**
* Creates a new instance.
*
* @param crossPlatformExecutor the {@link CrossPlatformExecutor} that instruments this instance or {@code null} if none
*/
protected ExecutorTemplate(CrossPlatformExecutor crossPlatformExecutor) {
this.crossPlatformExecutor = crossPlatformExecutor;
}
@Override
protected void disposeUnreferenced() {
this.dispose();
}
@Override
public void register(ExecutionResource resource) {
if (!this.registeredResources.add(resource)) {
this.logger.warn("Registered {} twice.", resource);
}
}
@Override
public void unregister(ExecutionResource resource) {
if (!this.registeredResources.remove(resource)) {
this.logger.warn("Could not unregister {}, as it was not registered.", resource);
}
}
/**
* Select the produced {@link ChannelInstance}s that are marked for instrumentation and register them if they
* contain a measured cardinality.
*
* @param producedChannelInstances the {@link ChannelInstance}s
*/
protected void registerMeasuredCardinalities(Collection<ChannelInstance> producedChannelInstances) {
for (ChannelInstance producedChannelInstance : producedChannelInstances) {
if (!producedChannelInstance.wasProduced()) {
this.logger.error("Expected {} to be produced, but is not flagged as such.", producedChannelInstance);
continue;
}
if (producedChannelInstance.isMarkedForInstrumentation()) {
this.registerMeasuredCardinality(producedChannelInstance);
}
}
}
/**
* If the given {@link ChannelInstance} has a measured cardinality, then register this cardinality in the
* {@link #crossPlatformExecutor} with the corresponding {@link Channel} and all its siblings.
*
* @param channelInstance the said {@link ChannelInstance}
*/
protected void registerMeasuredCardinality(ChannelInstance channelInstance) {
// Check if a cardinality was measured in the first place.
final OptionalLong optionalCardinality = channelInstance.getMeasuredCardinality();
if (!optionalCardinality.isPresent()) {
if (channelInstance.getChannel().isMarkedForInstrumentation()) {
this.logger.warn(
"No cardinality available for {}, although it was requested.", channelInstance.getChannel()
);
}
return;
}
this.crossPlatformExecutor.addCardinalityMeasurement(channelInstance);
}
/**
* Checks whether the given {@link Channel} is inside of a {@link ExecutionStageLoop}.
*
* @param channel the said {@link Channel}
* @return whether the {@link Channel} is in a {@link ExecutionStageLoop}
*/
private static boolean checkIfIsInLoopChannel(Channel channel) {
final ExecutionStageLoop producerLoop = channel.getProducer().getStage().getLoop();
return producerLoop != null && channel.getConsumers().stream().anyMatch(
consumer -> consumer.getStage().getLoop() == producerLoop
);
}
/**
* Create a {@link PartialExecution} according to the given parameters.
*
* @param executionLineageNodes {@link ExecutionLineageNode}s reflecting what has been executed
* @param executionDuration the measured execution duration in milliseconds
* @return the {@link PartialExecution} or {@code null} if nothing has been executed
*/
protected PartialExecution createPartialExecution(
Collection<ExecutionLineageNode> executionLineageNodes,
long executionDuration) {
if (executionLineageNodes.isEmpty()) return null;
final PartialExecution partialExecution = PartialExecution.createFromMeasurement(
executionDuration, executionLineageNodes, this.getConfiguration()
);
return partialExecution;
}
private static String formatCardinalities(OptimizationContext.OperatorContext opCtx) {
StringBuilder sb = new StringBuilder().append('[');
String separator = "";
final CardinalityEstimate[] inputCardinalities = opCtx.getInputCardinalities();
for (int inputIndex = 0; inputIndex < inputCardinalities.length; inputIndex++) {
if (inputCardinalities[inputIndex] != null) {
String slotName = opCtx.getOperator().getNumInputs() > inputIndex ?
opCtx.getOperator().getInput(inputIndex).getName() :
"(none)";
sb.append(separator).append(slotName).append(": ").append(inputCardinalities[inputIndex]);
separator = ", ";
}
}
return sb.append(']').toString();
}
@Override
public void dispose() {
if (this.getNumReferences() != 0) {
this.logger.warn("Disposing {} although it is still being referenced.", this);
}
for (ExecutionResource resource : new ArrayList<>(this.registeredResources)) {
resource.dispose();
}
if (this.getNumReferences() > 0) {
this.logger.warn("There are still {} referenced on {}, which is about to be disposed.", this.getNumReferences(), this);
}
}
@Override
public CrossPlatformExecutor getCrossPlatformExecutor() {
return this.crossPlatformExecutor;
}
@Override
public String toString() {
return String.format("%s[%x]", this.getClass().getSimpleName(), this.id);
}
public Configuration getConfiguration() {
return this.crossPlatformExecutor.getConfiguration();
}
}