blob: 8a0d3a7ff210a226fbe478ae9a98d0137298a9aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.profiler.spark;
import org.apache.wayang.commons.util.profiledb.instrumentation.StopWatch;
import org.apache.wayang.commons.util.profiledb.model.Experiment;
import org.apache.wayang.commons.util.profiledb.model.Subject;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.profiler.data.DataGenerators;
import org.apache.wayang.spark.platform.SparkPlatform;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* Starts a profiling run of Spark.
*/
public class Main {
public static void main(String[] args) {
if (args.length < 2) {
System.err.printf("Usage: java %s <operator to profile> [<cardinality n>[,<cardinality n>]*]+ \n", Main.class);
System.exit(1);
}
String operator = args[0];
List<List<Long>> allCardinalities = new LinkedList<>();
for (int i = 1; i < args.length; i++) {
List<Long> cardinalities = Arrays.stream(args[i].split(",")).map(Long::valueOf).collect(Collectors.toList());
allCardinalities.add(cardinalities);
}
List<SparkOperatorProfiler.Result> results;
switch (operator) {
case "textsource":
results = profile(OperatorProfilers.createSparkTextFileSourceProfiler(), allCardinalities);
break;
case "collectionsource":
results = profile(OperatorProfilers.createSparkCollectionSourceProfiler(), allCardinalities);
break;
case "map":
results = profile(OperatorProfilers.createSparkMapProfiler(), allCardinalities);
break;
case "filter":
results = profile(OperatorProfilers.createSparkFilterProfiler(), allCardinalities);
break;
case "flatmap":
results = profile(OperatorProfilers.createSparkFlatMapProfiler(), allCardinalities);
break;
case "reduce":
results = profile(OperatorProfilers.createSparkReduceByProfiler(), allCardinalities);
break;
case "globalreduce":
results = profile(OperatorProfilers.createSparkGlobalReduceProfiler(), allCardinalities);
break;
case "distinct":
case "distinct-string":
results = profile(OperatorProfilers.createSparkDistinctProfiler(), allCardinalities);
break;
case "distinct-integer":
results = profile(OperatorProfilers.createSparkDistinctProfiler(
DataGenerators.createReservoirBasedIntegerSupplier(new ArrayList<>(), 0.7d, new Random(42)),
Integer.class,
new Configuration()
), allCardinalities);
break;
case "sort":
case "sort-string":
results = profile(OperatorProfilers.createSparkSortProfiler(), allCardinalities);
break;
case "sort-integer":
results = profile(OperatorProfilers.createSparkSortProfiler(
DataGenerators.createReservoirBasedIntegerSupplier(new ArrayList<>(), 0.7d, new Random(42)),
Integer.class,
new Configuration()
), allCardinalities);
break;
case "count":
results = profile(OperatorProfilers.createSparkCountProfiler(), allCardinalities);
break;
case "groupby":
results = profile(OperatorProfilers.createSparkMaterializedGroupByProfiler(), allCardinalities);
break;
case "join":
results = profile(OperatorProfilers.createSparkJoinProfiler(), allCardinalities);
break;
case "union":
results = profile(OperatorProfilers.createSparkUnionProfiler(), allCardinalities);
break;
case "cartesian":
results = profile(OperatorProfilers.createSparkCartesianProfiler(), allCardinalities);
break;
case "callbacksink":
results = profile(OperatorProfilers.createSparkLocalCallbackSinkProfiler(), allCardinalities);
break;
// case "word-count-split": {
// final Supplier<String> randomStringSupplier = DataGenerators.createRandomStringSupplier(2, 10, new Random(42));
// results = profile(
// org.apache.wayang.profiler.java.OperatorProfilers.createJavaFlatMapProfiler(
// () -> String.format("%s %s %s %s %s %s %s %s %s",
// randomStringSupplier.get(), randomStringSupplier.get(),
// randomStringSupplier.get(), randomStringSupplier.get(),
// randomStringSupplier.get(), randomStringSupplier.get(),
// randomStringSupplier.get(), randomStringSupplier.get(),
// randomStringSupplier.get()),
// str -> Arrays.asList(str.split(" ")),
// String.class,
// String.class
// ),
// cardinalities);
// break;
// }
// case "word-count-canonicalize": {
// final Supplier<String> randomStringSupplier = DataGenerators.createRandomStringSupplier(2, 10, new Random(42));
// results = profile(
// org.apache.wayang.profiler.java.OperatorProfilers.createJavaMapProfiler(
// randomStringSupplier,
// word -> new Tuple2<>(word.toLowerCase(), 1),
// String.class,
// Tuple2.class
// ),
// cardinalities
// );
// break;
// }
// case "word-count-count": {
// final Supplier<String> stringSupplier = DataGenerators.createReservoirBasedStringSupplier(new ArrayList<>(), 0.7, new Random(42), 2, 10);
// results = profile(
// org.apache.wayang.profiler.java.OperatorProfilers.createJavaReduceByProfiler(
// () -> new Tuple2<>(stringSupplier.get(), 1),
// pair -> pair.field0,
// (p1, p2) -> {
// p1.field1 += p2.field1;
// return p1;
// },
// cast(Tuple2.class),
// String.class
// ),
// cardinalities
// );
// break;
// }
default:
System.out.println("Unknown operator: " + operator);
return;
}
System.out.println();
System.out.println(WayangCollections.getAny(results).getCsvHeader());
results.forEach(result -> System.out.println(result.toCsvString()));
}
private static StopWatch createStopWatch() {
Experiment experiment = new Experiment("wayang-profiler", new Subject("Wayang", "0.1"));
return new StopWatch(experiment);
}
/**
* Run the {@code opProfiler} with all combinations that can be derived from {@code allCardinalities}.
*/
private static List<SparkOperatorProfiler.Result> profile(SparkOperatorProfiler opProfiler,
List<List<Long>> allCardinalities) {
return StreamSupport.stream(WayangCollections.streamedCrossProduct(allCardinalities).spliterator(), false)
.map(cardinalities -> profile(opProfiler, WayangArrays.toArray(cardinalities)))
.collect(Collectors.toList());
}
/**
* Run the {@code opProfiler} with the given {@code cardinalities}.
*/
private static SparkOperatorProfiler.Result profile(SparkOperatorProfiler opProfiler, long... cardinalities) {
System.out.printf("Profiling %s with %s data quanta.\n", opProfiler, WayangArrays.asList(cardinalities));
final StopWatch stopWatch = createStopWatch();
SparkOperatorProfiler.Result result = null;
try {
System.out.println("Prepare...");
final TimeMeasurement preparation = stopWatch.start("Preparation");
SparkPlatform.getInstance().warmUp(new Configuration());
opProfiler.prepare(cardinalities);
preparation.stop();
System.out.println("Execute...");
final TimeMeasurement execution = stopWatch.start("Execution");
result = opProfiler.run();
execution.stop();
} finally {
System.out.println("Clean up...");
final TimeMeasurement cleanUp = stopWatch.start("Clean up");
opProfiler.cleanUp();
cleanUp.stop();
System.out.println("Measurement:");
if (result != null) System.out.println(result);
System.out.println(stopWatch.toPrettyString());
System.out.println();
}
return result;
}
}