blob: 7fdddddad13143b2a54b6642c6cd00085cbb1237 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.java.execution;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.ExtendedFunction;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.PartialExecution;
import org.apache.wayang.core.platform.PushExecutorTemplate;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Formats;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.java.compiler.FunctionCompiler;
import org.apache.wayang.java.operators.JavaExecutionOperator;
import org.apache.wayang.java.platform.JavaPlatform;
/**
* {@link Executor} implementation for the {@link JavaPlatform}.
*/
public class JavaExecutor extends PushExecutorTemplate {
private final JavaPlatform platform;
private final FunctionCompiler compiler;
public JavaExecutor(JavaPlatform javaPlatform, Job job) {
super(job);
this.platform = javaPlatform;
this.compiler = new FunctionCompiler(job.getConfiguration());
}
@Override
public JavaPlatform getPlatform() {
return this.platform;
}
@Override
protected Tuple<List<ChannelInstance>, PartialExecution> execute(
ExecutionTask task,
List<ChannelInstance> inputChannelInstances,
OptimizationContext.OperatorContext producerOperatorContext,
boolean isRequestEagerExecution
) {
// Provide the ChannelInstances for the output of the task.
final ChannelInstance[] outputChannelInstances = task.getOperator().createOutputChannelInstances(
this, task, producerOperatorContext, inputChannelInstances
);
// Execute.
final Collection<ExecutionLineageNode> executionLineageNodes;
final Collection<ChannelInstance> producedChannelInstances;
// TODO: Use proper progress estimator.
this.job.reportProgress(task.getOperator().getName(), 50);
long startTime = System.currentTimeMillis();
try {
final Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> results =
cast(task.getOperator()).evaluate(
toArray(inputChannelInstances),
outputChannelInstances,
this,
producerOperatorContext
);
//Thread.sleep(1000);
executionLineageNodes = results.getField0();
producedChannelInstances = results.getField1();
} catch (Exception e) {
throw new WayangException(String.format("Executing %s failed.", task), e);
}
long endTime = System.currentTimeMillis();
long executionDuration = endTime - startTime;
this.job.reportProgress(task.getOperator().getName(), 100);
// Check how much we executed.
PartialExecution partialExecution = this.createPartialExecution(executionLineageNodes, executionDuration);
if (partialExecution == null && executionDuration > 10) {
this.logger.warn("Execution of {} took suspiciously long ({}).", task, Formats.formatDuration(executionDuration));
}
// Collect any cardinality updates.
this.registerMeasuredCardinalities(producedChannelInstances);
// Warn if requested eager execution did not take place.
if (isRequestEagerExecution && partialExecution == null) {
this.logger.info("{} was not executed eagerly as requested.", task);
}
return new Tuple<>(Arrays.asList(outputChannelInstances), partialExecution);
}
private static JavaExecutionOperator cast(ExecutionOperator executionOperator) {
return (JavaExecutionOperator) executionOperator;
}
private static ChannelInstance[] toArray(List<ChannelInstance> channelInstances) {
final ChannelInstance[] array = new ChannelInstance[channelInstances.size()];
return channelInstances.toArray(array);
}
/**
* Utility function to open an {@link ExtendedFunction}.
*
* @param operator the {@link JavaExecutionOperator} containing the function
* @param function the {@link ExtendedFunction}; if it is of a different type, nothing happens
* @param inputs the input {@link ChannelInstance}s for the {@code operator}
* @param operatorContext context information for the {@code operator}
*/
public static void openFunction(JavaExecutionOperator operator,
Object function,
ChannelInstance[] inputs,
OptimizationContext.OperatorContext operatorContext) {
if (function instanceof ExtendedFunction) {
ExtendedFunction extendedFunction = (ExtendedFunction) function;
int iterationNumber = operatorContext.getOptimizationContext().getIterationNumber();
extendedFunction.open(new JavaExecutionContext(operator, inputs, iterationNumber));
}
}
public FunctionCompiler getCompiler() {
return this.compiler;
}
}