blob: ad1714fe0f0d7bb016df6370ef917248f0189462 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.tpcds;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
import org.apache.commons.csv.CSVFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run
* queries.
*/
public class SqlTransformRunner {
private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
private static final List<String> SUMMARY_HEADERS_LIST =
Arrays.asList(
"Query Name",
"Job Name",
"Data Size",
"Dialect",
"Status",
"Start Time",
"End Time",
"Elapsed Time(sec)");
private static final Logger LOG = LoggerFactory.getLogger(SqlTransformRunner.class);
/**
* Get all tables (in the form of TextTable) needed for a specific query execution.
*
* @param pipeline The pipeline that will be run to execute the query
* @param csvFormat The csvFormat to construct readConverter (CsvToRow) and writeConverter
* (RowToCsv)
* @param queryName The name of the query which will be executed (for example: query3, query55,
* query96)
* @return A PCollectionTuple which is constructed by all tables needed for running query.
* @throws Exception
*/
private static PCollectionTuple getTables(
Pipeline pipeline, CSVFormat csvFormat, String queryName) throws Exception {
Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
TpcdsOptions tpcdsOptions = pipeline.getOptions().as(TpcdsOptions.class);
String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
String queryString = QueryReader.readQuery(queryName);
PCollectionTuple tables = PCollectionTuple.empty(pipeline);
for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) {
String tableName = tableSchema.getKey();
// Only when queryString contains tableName, the table is relevant to this query and will be
// added. This can avoid reading unnecessary data files.
// TODO: Simple but not reliable way since table name can be any substring in a query and can
// give false positives
if (queryString.contains(tableName)) {
switch (tpcdsOptions.getSourceType()) {
case CSV:
{
PCollection<Row> table =
getTableCSV(pipeline, csvFormat, tpcdsOptions, dataSize, tableSchema, tableName);
tables = tables.and(new TupleTag<>(tableName), table);
break;
}
case PARQUET:
{
PCollection<GenericRecord> table =
getTableParquet(pipeline, tpcdsOptions, dataSize, tableName);
tables = tables.and(new TupleTag<>(tableName), table);
break;
}
default:
throw new IllegalStateException(
"Unexpected source type: " + tpcdsOptions.getSourceType());
}
}
}
return tables;
}
private static PCollection<GenericRecord> getTableParquet(
Pipeline pipeline, TpcdsOptions tpcdsOptions, String dataSize, String tableName)
throws IOException {
org.apache.avro.Schema schema = getAvroSchema(tableName);
String filepattern =
tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + "/*.parquet";
return pipeline.apply(
"Read " + tableName + " (parquet)",
ParquetIO.read(schema)
.from(filepattern)
.withSplit()
// TODO: add .withProjection()
.withBeamSchemas(true));
}
private static PCollection<Row> getTableCSV(
Pipeline pipeline,
CSVFormat csvFormat,
TpcdsOptions tpcdsOptions,
String dataSize,
Map.Entry<String, Schema> tableSchema,
String tableName) {
// This is location path where the data are stored
String filePattern =
tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat";
return new TextTable(
tableSchema.getValue(),
filePattern,
new CsvToRow(tableSchema.getValue(), csvFormat),
new RowToCsv(csvFormat))
.buildIOReader(pipeline.begin())
.setCoder(SchemaCoder.of(tableSchema.getValue()))
.setName(tableSchema.getKey());
}
private static org.apache.avro.Schema getAvroSchema(String tableName) throws IOException {
String path = "schemas_avro/" + tableName + ".json";
return new org.apache.avro.Schema.Parser()
.parse(Resources.toString(Resources.getResource(path), Charsets.UTF_8));
}
/**
* Print the summary table after all jobs are finished.
*
* @param completion A collection of all TpcdsRunResult that are from finished jobs.
* @param numOfResults The number of results in the collection.
* @throws Exception
*/
private static void printExecutionSummary(
CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
List<List<String>> summaryRowsList = new ArrayList<>();
for (int i = 0; i < numOfResults; i++) {
TpcdsRunResult tpcdsRunResult = completion.take().get();
List<String> list = new ArrayList<>();
list.add(tpcdsRunResult.getQueryName());
list.add(tpcdsRunResult.getJobName());
list.add(tpcdsRunResult.getDataSize());
list.add(tpcdsRunResult.getDialect());
// If the job is not successful, leave the run time related field blank
list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString() : "");
list.add(
tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
summaryRowsList.add(list);
}
System.out.println(SUMMARY_START);
System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
}
/**
* This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
*
* @param args Command line arguments
* @throws Exception
*/
public static void runUsingSqlTransform(String[] args) throws Exception {
TpcdsOptions tpcdsOptions =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
String[] queryNames = TpcdsParametersReader.getAndCheckQueryNames(tpcdsOptions);
int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
// Using ExecutorService and CompletionService to fulfill multi-threading functionality
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
// Make an array of pipelines, each pipeline is responsible for running a corresponding query.
Pipeline[] pipelines = new Pipeline[queryNames.length];
CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
// Execute all queries, transform each result into a PCollection<String>, write them into
// the txt file and store in a GCP directory.
for (int i = 0; i < queryNames.length; i++) {
// For each query, get a copy of pipelineOptions from command line arguments.
TpcdsOptions tpcdsOptionsCopy =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
// Set a unique job name using the time stamp so that multiple different pipelines can run
// together.
tpcdsOptionsCopy.setJobName(queryNames[i] + "result" + System.currentTimeMillis());
pipelines[i] = Pipeline.create(tpcdsOptionsCopy);
String queryString = QueryReader.readQuery(queryNames[i]);
PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNames[i]);
try {
tables
.apply(SqlTransform.query(queryString))
.apply(MapElements.into(TypeDescriptors.strings()).via(Row::toString))
.apply(
TextIO.write()
.to(
tpcdsOptions.getResultsDirectory()
+ "/"
+ dataSize
+ "/"
+ pipelines[i].getOptions().getJobName())
.withSuffix(".txt")
.withNumShards(1));
} catch (Exception e) {
LOG.error("{} failed to execute", queryNames[i]);
e.printStackTrace();
}
completion.submit(new TpcdsRun(pipelines[i]));
}
executor.shutdown();
printExecutionSummary(completion, queryNames.length);
}
}