blob: f65a8d47554403fb1e0f59d53c0b4c5a7d83d03c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.nexmark;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* An implementation of the 'NEXMark queries' for Beam. These are multiple queries over a three
* table schema representing an online auction system:
*
* <ul>
* <li>{@link Person} represents a person submitting an item for auction and/or making a bid on an
* auction.
* <li>{@link Auction} represents an item under auction.
* <li>{@link Bid} represents a bid for an item under auction.
* </ul>
*
* The queries exercise many aspects of the Beam model.
*
* <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
* particularly sensible.
*
* <p>See <a
* href="https://web.archive.org/web/20100620010601/http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
* Nexmark website</a>
*/
public class Main {
private static class Result {
private final NexmarkConfiguration configuration;
private final NexmarkPerf perf;
private Result(NexmarkConfiguration configuration, NexmarkPerf perf) {
this.configuration = configuration;
this.perf = perf;
}
}
private static class Run implements Callable<Result> {
private final NexmarkLauncher<NexmarkOptions> nexmarkLauncher;
private final NexmarkConfiguration configuration;
private Run(NexmarkOptions options, NexmarkConfiguration configuration) {
this.nexmarkLauncher = new NexmarkLauncher<>(options, configuration);
this.configuration = configuration;
}
@Override
public Result call() throws IOException {
NexmarkPerf perf = nexmarkLauncher.run();
return new Result(configuration, perf);
}
}
/** Entry point. */
void runAll(String[] args) throws IOException {
NexmarkOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(NexmarkOptions.class);
Instant start = Instant.now();
Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
Set<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
int nThreads = Math.min(options.getNexmarkParallel(), configurations.size());
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
CompletionService<Result> completion = new ExecutorCompletionService(executor);
boolean successful = true;
try {
// Schedule all the configurations.
for (NexmarkConfiguration configuration : configurations) {
NexmarkOptions optionsCopy = PipelineOptionsFactory.fromArgs(args).as(NexmarkOptions.class);
completion.submit(new Run(optionsCopy, configuration));
}
// Collect all the results.
for (int scheduled = configurations.size(); scheduled > 0; scheduled--) {
Result result;
try {
result = completion.take().get();
} catch (InterruptedException e) {
break;
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof IOException) {
throw new IOException(t);
} else {
throw new RuntimeException(t);
}
}
NexmarkConfiguration configuration = result.configuration;
NexmarkPerf perf = result.perf;
if (perf == null) {
continue;
} else if (perf.errors == null || perf.errors.size() > 0) {
successful = false;
}
appendPerf(options.getPerfFilename(), configuration, perf);
actual.put(configuration, perf);
// Summarize what we've run so far.
saveSummary(null, configurations, actual, baseline, start, options);
}
if (options.getExportSummaryToBigQuery()) {
ImmutableMap<String, String> schema =
ImmutableMap.<String, String>builder()
.put("timestamp", "timestamp")
.put("runtimeSec", "float")
.put("eventsPerSec", "float")
.put("numResults", "integer")
.build();
savePerfsToBigQuery(
BigQueryResultsPublisher.create(options.getBigQueryDataset(), schema),
options,
actual,
start);
}
} finally {
if (options.getMonitorJobs()) {
// Report overall performance.
saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start, options);
saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
}
executor.shutdown();
}
if (!successful) {
throw new RuntimeException("Execution was not successful");
}
}
@VisibleForTesting
static void savePerfsToBigQuery(
BigQueryResultsPublisher publisher,
NexmarkOptions options,
Map<NexmarkConfiguration, NexmarkPerf> perfs,
Instant start) {
for (Map.Entry<NexmarkConfiguration, NexmarkPerf> entry : perfs.entrySet()) {
String queryName =
NexmarkUtils.fullQueryName(
options.getQueryLanguage(), entry.getKey().query.getNumberOrName());
String tableName = NexmarkUtils.tableName(options, queryName, 0L, null);
publisher.publish(entry.getValue(), tableName, start.getMillis());
}
}
/** Append the pair of {@code configuration} and {@code perf} to perf file. */
private void appendPerf(
@Nullable String perfFilename, NexmarkConfiguration configuration, NexmarkPerf perf) {
if (perfFilename == null) {
return;
}
List<String> lines = new ArrayList<>();
lines.add("");
lines.add(String.format("# %s", Instant.now()));
lines.add(String.format("# %s", configuration.toShortString()));
lines.add(configuration.toString());
lines.add(perf.toString());
try {
Files.write(
Paths.get(perfFilename),
lines,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
} catch (IOException e) {
throw new RuntimeException("Unable to write perf file: ", e);
}
NexmarkUtils.console("appended results to perf file %s.", perfFilename);
}
/** Load the baseline perf. */
@Nullable
private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
@Nullable String baselineFilename) {
if (baselineFilename == null) {
return null;
}
Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
List<String> lines;
try {
lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Unable to read baseline perf file: ", e);
}
for (int i = 0; i < lines.size(); i++) {
if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
continue;
}
NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
baseline.put(configuration, perf);
}
NexmarkUtils.console(
"loaded %d entries from baseline file %s.", baseline.size(), baselineFilename);
return baseline;
}
private static final String LINE =
"==========================================================================================";
/** Print summary of {@code actual} vs (if non-null) {@code baseline}. */
private static void saveSummary(
@Nullable String summaryFilename,
Iterable<NexmarkConfiguration> configurations,
Map<NexmarkConfiguration, NexmarkPerf> actual,
@Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline,
Instant start,
NexmarkOptions options) {
List<String> lines = new ArrayList<>();
lines.add("");
lines.add(LINE);
lines.add(
String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
lines.add("");
lines.add("Default configuration:");
lines.add(NexmarkConfiguration.DEFAULT.toString());
lines.add("");
lines.add("Configurations:");
lines.add(" Conf Description");
int conf = 0;
for (NexmarkConfiguration configuration : configurations) {
lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
NexmarkPerf actualPerf = actual.get(configuration);
if (actualPerf != null && actualPerf.jobId != null) {
lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
}
}
lines.add("");
lines.add("Performance:");
lines.add(
String.format(
" %4s %12s %12s %12s %12s %12s %12s",
"Conf",
"Runtime(sec)",
"(Baseline)",
"Events(/sec)",
"(Baseline)",
"Results",
"(Baseline)"));
conf = 0;
for (NexmarkConfiguration configuration : configurations) {
String line = String.format(" %04d ", conf++);
NexmarkPerf actualPerf = actual.get(configuration);
if (actualPerf == null) {
line += "*** not run ***";
} else {
NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
double runtimeSec = actualPerf.runtimeSec;
line += String.format("%12.1f ", runtimeSec);
if (baselinePerf == null) {
line += String.format("%12s ", "");
} else {
double baselineRuntimeSec = baselinePerf.runtimeSec;
double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
line += String.format("%+11.2f%% ", diff);
}
double eventsPerSec = actualPerf.eventsPerSec;
line += String.format("%12.1f ", eventsPerSec);
if (baselinePerf == null) {
line += String.format("%12s ", "");
} else {
double baselineEventsPerSec = baselinePerf.eventsPerSec;
double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
line += String.format("%+11.2f%% ", diff);
}
long numResults = actualPerf.numResults;
line += String.format("%12d ", numResults);
if (baselinePerf == null) {
line += String.format("%12s", "");
} else {
long baselineNumResults = baselinePerf.numResults;
long diff = numResults - baselineNumResults;
line += String.format("%+12d", diff);
}
}
lines.add(line);
if (actualPerf != null) {
List<String> errors = actualPerf.errors;
if (errors == null) {
errors = new ArrayList<>();
errors.add("NexmarkGoogleRunner returned null errors list");
}
for (String error : errors) {
lines.add(String.format(" %4s *** %s ***", "", error));
}
}
}
lines.add(LINE);
lines.add("");
for (String line : lines) {
System.out.println(line);
}
if (summaryFilename != null) {
try {
Files.write(
Paths.get(summaryFilename),
lines,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
} catch (IOException e) {
throw new RuntimeException("Unable to save summary file: ", e);
}
NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
}
}
/**
* Write all perf data and any baselines to a javascript file which can be used by graphing page
* etc.
*/
private static void saveJavascript(
@Nullable String javascriptFilename,
Iterable<NexmarkConfiguration> configurations,
Map<NexmarkConfiguration, NexmarkPerf> actual,
@Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline,
Instant start) {
if (javascriptFilename == null) {
return;
}
List<String> lines = new ArrayList<>();
lines.add(
String.format(
"// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
lines.add("var all = [");
for (NexmarkConfiguration configuration : configurations) {
lines.add(" {");
lines.add(String.format(" config: %s", configuration));
NexmarkPerf actualPerf = actual.get(configuration);
if (actualPerf != null) {
lines.add(String.format(" ,perf: %s", actualPerf));
}
NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
if (baselinePerf != null) {
lines.add(String.format(" ,baseline: %s", baselinePerf));
}
lines.add(" },");
}
lines.add("];");
try {
Files.write(
Paths.get(javascriptFilename),
lines,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
} catch (IOException e) {
throw new RuntimeException("Unable to save javascript file: ", e);
}
NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
}
public static void main(String[] args) throws IOException {
new Main().runAll(args);
}
}