blob: d221deb0204645d8f833383fdc4cff68b1da34af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.profiler.spark;
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.profiler.util.ProfilingUtils;
import org.apache.wayang.profiler.util.RrdAccessor;
import org.apache.wayang.spark.channels.RddChannel;
import org.apache.wayang.spark.compiler.FunctionCompiler;
import org.apache.wayang.spark.execution.SparkExecutor;
import org.apache.wayang.spark.operators.SparkExecutionOperator;
import org.rrd4j.ConsolFun;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
/**
* Allows to instrument an {@link SparkExecutionOperator}.
*/
public abstract class SparkOperatorProfiler {
protected final Logger logger = LogManager.getLogger(this.getClass());
protected Supplier<SparkExecutionOperator> operatorGenerator;
protected final List<Supplier<?>> dataQuantumGenerators;
private final String gangliaRrdsDir;
private final String gangliaClusterName;
public int cpuMhz, numMachines, numCoresPerMachine, numPartitions;
private final int dataQuantumGeneratorBatchSize;
private final String dataQuantumGeneratorLocation;
protected final long executionPaddingTime;
protected SparkExecutionOperator operator;
protected SparkExecutor sparkExecutor;
protected final FunctionCompiler functionCompiler = new FunctionCompiler();
protected List<Long> inputCardinalities;
public SparkOperatorProfiler(Supplier<SparkExecutionOperator> operatorGenerator,
Configuration configuration,
Supplier<?>... dataQuantumGenerators) {
this.operatorGenerator = operatorGenerator;
this.dataQuantumGenerators = Arrays.asList(dataQuantumGenerators);
this.cpuMhz = (int) configuration.getLongProperty("wayang.spark.cpu.mhz", 2700);
this.numMachines = (int) configuration.getLongProperty("wayang.spark.machines", 1);
this.numCoresPerMachine = (int) configuration.getLongProperty("wayang.spark.cores-per-machine", 1);
this.numPartitions = (int) configuration.getLongProperty("wayang.spark.partitions", -1);
this.gangliaRrdsDir = configuration.getStringProperty("wayang.ganglia.rrds", "/var/lib/ganglia/rrds");
this.gangliaClusterName = configuration.getStringProperty("wayang.ganglia.cluster", "cluster");
this.dataQuantumGeneratorBatchSize = (int) configuration.getLongProperty("wayang.profiler.datagen.batchsize", 5000000);
this.dataQuantumGeneratorLocation = configuration.getStringProperty("wayang.profiler.datagen.location", "worker");
this.executionPaddingTime = configuration.getLongProperty("wayang.profiler.execute.padding", 5000);
}
/**
* Call this method before {@link #run()} to prepare the profiling run
*
* @param inputCardinalities number of input elements for each input of the profiled operator
*/
public void prepare(long... inputCardinalities) {
this.operator = this.operatorGenerator.get();
this.inputCardinalities = WayangArrays.asList(inputCardinalities);
this.sparkExecutor = ProfilingUtils.fakeSparkExecutor(ReflectionUtils.getDeclaringJar(SparkOperatorProfiler.class));
for (int inputIndex = 0; inputIndex < inputCardinalities.length; inputIndex++) {
long inputCardinality = inputCardinalities[inputIndex];
this.prepareInput(inputIndex, inputCardinality);
}
}
protected abstract void prepareInput(int inputIndex, long inputCardinality);
/**
* Helper method to generate data quanta and provide them as a cached {@link JavaRDD}. Uses an implementation
* based on the {@code wayang.profiler.datagen.location} property.
*/
protected <T> JavaRDD<T> prepareInputRdd(long cardinality, int inputIndex) {
switch (this.dataQuantumGeneratorLocation) {
case "worker":
return this.prepareInputRddInWorker(cardinality, inputIndex);
case "driver":
return this.prepareInputRddInDriver(cardinality, inputIndex);
default:
this.logger.error("In correct data generation location (is: {}, allowed: worker/driver). Using worker.");
return this.prepareInputRddInWorker(cardinality, inputIndex);
}
}
/**
* Helper method to generate data quanta and provide them as a cached {@link JavaRDD}.
*/
protected <T> JavaRDD<T> prepareInputRddInDriver(long cardinality, int inputIndex) {
@SuppressWarnings("unchecked")
final Supplier<T> supplier = (Supplier<T>) this.dataQuantumGenerators.get(inputIndex);
JavaRDD<T> finalInputRdd = null;
// Create batches, parallelize them, and union them.
long remainder = cardinality;
do {
int batchSize = (int) Math.min(remainder, this.dataQuantumGeneratorBatchSize);
List<T> batch = new ArrayList<>(batchSize);
while (batch.size() < batchSize) {
batch.add(supplier.get());
}
final JavaRDD<T> batchRdd = this.sparkExecutor.sc.parallelize(batch);
finalInputRdd = finalInputRdd == null ? batchRdd : finalInputRdd.union(batchRdd);
remainder -= batchSize;
} while (remainder > 0);
// Shuffle and cache the RDD.
final JavaRDD<T> cachedInputRdd = this.partition(finalInputRdd).cache();
cachedInputRdd.foreach(dataQuantum -> {
});
return cachedInputRdd;
}
/**
* Helper method to generate data quanta and provide them as a cached {@link JavaRDD}.
*/
protected <T> JavaRDD<T> prepareInputRddInWorker(long cardinality, int inputIndex) {
// Create batches, parallelize them, and union them.
final List<Integer> batchSizes = new LinkedList<>();
int numFullBatches = (int) (cardinality / this.dataQuantumGeneratorBatchSize);
for (int i = 0; i < numFullBatches; i++) {
batchSizes.add(this.dataQuantumGeneratorBatchSize);
}
batchSizes.add((int) (cardinality % this.dataQuantumGeneratorBatchSize));
@SuppressWarnings("unchecked")
final Supplier<T> supplier = (Supplier<T>) this.dataQuantumGenerators.get(inputIndex);
JavaRDD<T> finalInputRdd = this.sparkExecutor.sc
.parallelize(batchSizes, 1) // Single partition to ensure the same data generator.
.flatMap(batchSize -> {
List<T> list = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
list.add(supplier.get());
}
return list.iterator();
});
// Shuffle and cache the RDD.
final JavaRDD<T> cachedInputRdd = this.partition(finalInputRdd).cache();
cachedInputRdd.foreach(dataQuantum -> {
});
return cachedInputRdd;
}
/**
* If a desired number of partitions for the input {@link JavaRDD}s is requested, enforce this.
*/
protected <T> JavaRDD<T> partition(JavaRDD<T> rdd) {
return this.numPartitions == -1 ? rdd : rdd.coalesce(this.numPartitions, true);
}
/**
* Executes and profiles the profiling task. Requires that this instance is prepared.
*/
public Result run() {
final Result result = this.executeOperator();
this.sparkExecutor.dispose();
this.sparkExecutor = null;
return result;
}
/**
* Estimates the disk bytes occurred in the cluster during the given time span by waiting for Ganglia to provide
* the respective information in its RRD files.
*/
protected long provideCpuCycles(long startTime, long endTime) {
// Find out the average idle fraction in the cluster.
final double sumCpuIdleRatio = this.waitAndQueryMetricAverage("cpu_idle", "sum", startTime, endTime);
final double numCpuIdleRatio = this.waitAndQueryMetricAverage("cpu_idle", "num", startTime, endTime);
final double avgCpuIdleRatio = sumCpuIdleRatio / numCpuIdleRatio / 100;
// Determine number of cycles per millisecond.
long passedMillis = endTime - startTime;
double cyclesPerMillis = this.cpuMhz * 1e3 * this.numCoresPerMachine * this.numMachines;
// Estimate the number of spent CPU cycles in the cluster.
return Math.round(passedMillis * cyclesPerMillis * (1 - avgCpuIdleRatio));
}
/**
* Estimates the network bytes occurred in the cluster during the given time span by waiting for Ganglia to provide
* the respective information in its RRD files.
*/
protected long provideNetworkBytes(long startTime, long endTime) {
// Find out the average received/transmitted bytes per second.
final double transmittedBytesPerSec = this.waitAndQueryMetricAverage("tx_bytes_eth0", "sum", startTime, endTime);
final double receivedBytesPerSec = this.waitAndQueryMetricAverage("rx_bytes_eth0", "sum", startTime, endTime);
final double bytesPerSec = (transmittedBytesPerSec + receivedBytesPerSec) / 2;
// Estimate the number of actually communicated bytes.
return (long) (bytesPerSec / 1000 * (endTime - startTime));
}
/**
* Estimates the disk bytes occurred in the cluster during the given time span by waiting for Ganglia to provide
* the respective information in its RRD files.
*/
protected long provideDiskBytes(long startTime, long endTime) {
// Find out the average received/transmitted bytes per second.
final double readBytesPerSec = this.waitAndQueryMetricAverage("diskstat_sda_read_bytes_per_sec", "sum", startTime, endTime);
final double writeBytesPerSec = this.waitAndQueryMetricAverage("diskstat_sda_read_bytes_per_sec", "sum", startTime, endTime);
final double bytesPerSec = readBytesPerSec + writeBytesPerSec;
// Estimate the number of actually communicated bytes.
return (long) (bytesPerSec / 1000 * (endTime - startTime));
}
/**
* Queries an average metric from a Ganglia RRD file. If the metric is not recent enough, this method waits
* until the requested data points are available.
*/
private double waitAndQueryMetricAverage(String metric, String dataSeries, long startTime, long endTime) {
final String rrdFile = this.gangliaRrdsDir + File.separator +
this.gangliaClusterName + File.separator +
"__SummaryInfo__" + File.separator +
metric + ".rrd";
double metricValue = Double.NaN;
int numAttempts = 0;
do {
if (numAttempts++ > 0) {
ProfilingUtils.sleep(5000);
}
try (RrdAccessor rrdAccessor = RrdAccessor.open(rrdFile)) {
final long lastUpdateMillis = rrdAccessor.getLastUpdateMillis();
if (lastUpdateMillis >= endTime) {
metricValue = rrdAccessor.query(dataSeries, startTime, endTime, ConsolFun.AVERAGE);
} else {
this.logger.info("Last RRD file update is only from {} ({} attempts so far).", new Date(lastUpdateMillis), numAttempts);
}
} catch (Exception e) {
this.logger.error(String.format("Could not access RRD %s.", rrdFile), e);
return Double.NaN;
}
} while (Double.isNaN(metricValue));
return metricValue;
}
/**
* Executes the profiling task. Requires that this instance is prepared.
*/
protected abstract Result executeOperator();
/**
* Utility method to invoke
* {@link SparkExecutionOperator#evaluate(ChannelInstance[], ChannelInstance[], SparkExecutor, OptimizationContext.OperatorContext)}.
*/
protected void evaluate(SparkExecutionOperator operator,
ChannelInstance[] inputs,
ChannelInstance[] outputs) {
OptimizationContext optimizationContext = new DefaultOptimizationContext(this.sparkExecutor.getJob());
final OptimizationContext.OperatorContext operatorContext = optimizationContext.addOneTimeOperator(operator);
operator.evaluate(inputs, outputs, this.sparkExecutor, operatorContext);
}
/**
* Creates a {@link ChannelInstance} that carries the given {@code rdd}.
*/
protected static RddChannel.Instance createChannelInstance(final JavaRDD<?> rdd, SparkExecutor sparkExecutor) {
final RddChannel.Instance channelInstance = createChannelInstance(sparkExecutor);
channelInstance.accept(rdd, sparkExecutor);
return channelInstance;
}
/**
* Creates an empty {@link ChannelInstance}.
*/
protected static RddChannel.Instance createChannelInstance(SparkExecutor sparkExecutor) {
final ChannelDescriptor channelDescriptor = RddChannel.CACHED_DESCRIPTOR;
final RddChannel channel = (RddChannel) channelDescriptor.createChannel(null, sparkExecutor.getConfiguration());
return (RddChannel.Instance) channel.createInstance(null, null, -1);
}
/**
* Override this method to implement any clean-up logic.
*/
public void cleanUp() {
}
/**
* The result of a single profiling run.
*/
public static class Result {
private final List<Long> inputCardinalities;
private final int numMachines, numCoresPerMachine;
private final long outputCardinality;
private final long diskBytes, networkBytes;
private final long cpuCycles;
private final long wallclockMillis;
public Result(List<Long> inputCardinalities, long outputCardinality,
long wallclockMillis, long diskBytes, long networkBytes, long cpuCycles,
int numMachines, int numCoresPerMachine) {
this.inputCardinalities = inputCardinalities;
this.outputCardinality = outputCardinality;
this.wallclockMillis = wallclockMillis;
this.diskBytes = diskBytes;
this.networkBytes = networkBytes;
this.cpuCycles = cpuCycles;
this.numMachines = numMachines;
this.numCoresPerMachine = numCoresPerMachine;
}
public List<Long> getInputCardinalities() {
return this.inputCardinalities;
}
public long getOutputCardinality() {
return this.outputCardinality;
}
public long getDiskBytes() {
return this.diskBytes;
}
public long getNetworkBytes() {
return this.networkBytes;
}
public long getCpuCycles() {
return this.cpuCycles;
}
@Override
public String toString() {
return "Result{" +
"inputCardinalities=" + inputCardinalities +
", outputCardinality=" + outputCardinality +
", numMachines=" + numMachines +
", numCoresPerMachine=" + numCoresPerMachine +
", wallclockMillis=" + wallclockMillis +
", cpuCycles=" + cpuCycles +
", diskBytes=" + diskBytes +
", networkBytes=" + networkBytes +
'}';
}
public String getCsvHeader() {
return String.join(",", WayangCollections.map(this.inputCardinalities, (index, card) -> "input_card_" + index)) + "," +
"output_card," +
"wallclock," +
"disk," +
"network," +
"cpu," +
"machines," +
"cores_per_machine";
}
public String toCsvString() {
return String.join(",", WayangCollections.map(this.inputCardinalities, Object::toString)) + ","
+ this.outputCardinality + ","
+ this.wallclockMillis + ","
+ this.diskBytes + ","
+ this.networkBytes + ","
+ this.cpuCycles + ","
+ this.numMachines + ","
+ this.numCoresPerMachine;
}
}
}