[BEAM-9891] Generate query execution summary table after finishing jobs (#12601)
* [BEAM-9891] Generate query execution summary table after finishing jobs
* Print error message using LOG, check PipelineResult's state
Co-authored-by: Yuwei Fu <fuyuwei@google.com>
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index cbc0577..f94b748 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -20,7 +20,7 @@
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -35,19 +35,27 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and BeamSqlEnv.parseQuery to run queries.
*/
public class BeamSqlEnvRunner {
- private static final String dataDirectory = "gs://beamsql_tpcds_1/data";
- private static final String resultDirectory = "gs://beamsql_tpcds_1/tpcds_results";
+ private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+ private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+ private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+ private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
+
+ private static final Logger Log = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
private static String buildTableCreateStatement(String tableName) {
String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
@@ -55,7 +63,7 @@
}
private static String buildDataLocation(String dataSize, String tableName) {
- String dataLocation = dataDirectory + "/" + dataSize + "/" + tableName + ".dat";
+ String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
return dataLocation;
}
@@ -78,7 +86,7 @@
Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
for (String tableName : schemaMap.keySet()) {
- String dataLocation = dataDirectory + "/" + dataSize + "/" + tableName + ".dat";
+ String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
Schema tableSchema = schemaMap.get(tableName);
Table table = Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
inMemoryMetaStore.createTable(table);
@@ -86,6 +94,32 @@
}
/**
+ * Print the summary table after all jobs are finished.
+ * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+ * @param numOfResults The number of results in the collection.
+ * @throws Exception
+ */
+ private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+ List<List<String>> summaryRowsList = new ArrayList<>();
+ for (int i = 0; i < numOfResults; i++) {
+ TpcdsRunResult tpcdsRunResult = completion.take().get();
+ List<String> list = new ArrayList<>();
+ list.add(tpcdsRunResult.getQueryName());
+ list.add(tpcdsRunResult.getJobName());
+ list.add(tpcdsRunResult.getDataSize());
+ list.add(tpcdsRunResult.getDialect());
+ list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
+ list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+ summaryRowsList.add(list);
+ }
+
+ System.out.println(SUMMARY_START);
+ System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+ }
+
+ /**
* This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery() method. (Doesn't perform well when running query96).
* @param args Command line arguments
* @throws Exception
@@ -102,7 +136,7 @@
// Using ExecutorService and CompletionService to fulfill multi-threading functionality
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
- CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor);
+ CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
// Directly create all tables and register them into inMemoryMetaStore before creating BeamSqlEnv object.
registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
@@ -130,18 +164,25 @@
pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
String queryString = QueryReader.readQuery(queryNameArr[i]);
- // Query execution
- PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
+ try {
+ // Query execution
+ PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
- // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
- PCollection<String> rowStrings = rows.apply(MapElements
- .into(TypeDescriptors.strings())
- .via((Row row) -> row.toString()));
- rowStrings.apply(TextIO.write().to(resultDirectory + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
+ // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
+ PCollection<String> rowStrings = rows.apply(MapElements
+ .into(TypeDescriptors.strings())
+ .via((Row row) -> row.toString()));
+ rowStrings.apply(TextIO.write().to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
+ } catch (Exception e) {
+ Log.error("{} failed to execute", queryNameArr[i]);
+ e.printStackTrace();
+ }
completion.submit(new TpcdsRun(pipelines[i]));
}
executor.shutdown();
+
+ printExecutionSummary(completion, queryNameArr.length);
}
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index 2614ce2..a40cd12 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -19,7 +19,6 @@
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
@@ -30,18 +29,28 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.*;
import org.apache.commons.csv.CSVFormat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run queries.
*/
public class SqlTransformRunner {
- private static final String dataDirectory = "gs://beamsql_tpcds_1/data";
- private static final String resultDirectory = "gs://beamsql_tpcds_1/tpcds_results";
+ private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+ private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+ private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+ private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
+
+ private static final Logger Log = LoggerFactory.getLogger(SqlTransform.class);
/**
* Get all tables (in the form of TextTable) needed for a specific query execution
@@ -64,7 +73,7 @@
// Only when queryString contains tableName, the table is relevant to this query and will be added. This can avoid reading unnecessary data files.
if (queryString.contains(tableName)) {
// This is location path where the data are stored
- String filePattern = dataDirectory + "/" + dataSize + "/" + tableName + ".dat";
+ String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
PCollection<Row> table =
new TextTable(
@@ -83,8 +92,34 @@
}
/**
- * This is the default method in BeamTpcds.main method. Run job using SqlTransform.query()
- * method.
+ * Print the summary table after all jobs are finished.
+ * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+ * @param numOfResults The number of results in the collection.
+ * @throws Exception
+ */
+ private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+ List<List<String>> summaryRowsList = new ArrayList<>();
+ for (int i = 0; i < numOfResults; i++) {
+ TpcdsRunResult tpcdsRunResult = completion.take().get();
+ List<String> list = new ArrayList<>();
+ list.add(tpcdsRunResult.getQueryName());
+ list.add(tpcdsRunResult.getJobName());
+ list.add(tpcdsRunResult.getDataSize());
+ list.add(tpcdsRunResult.getDialect());
+ // If the job is not successful, leave the run time related field blank
+ list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+ list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
+ list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+ summaryRowsList.add(list);
+ }
+
+ System.out.println(SUMMARY_START);
+ System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+ }
+
+ /**
+ * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
* @param args Command line arguments
* @throws Exception
*/
@@ -97,7 +132,7 @@
// Using ExecutorService and CompletionService to fulfill multi-threading functionality
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
- CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor);
+ CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
// Make an array of pipelines, each pipeline is responsible for running a corresponding query.
Pipeline[] pipelines = new Pipeline[queryNameArr.length];
@@ -121,19 +156,26 @@
String queryString = QueryReader.readQuery(queryNameArr[i]);
PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
- tables
- .apply(
- SqlTransform.query(queryString))
- .apply(
- MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
- .apply(TextIO.write()
- .to(resultDirectory + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
- .withSuffix(".txt")
- .withNumShards(1));
+ try {
+ tables
+ .apply(
+ SqlTransform.query(queryString))
+ .apply(
+ MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+ .apply(TextIO.write()
+ .to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
+ .withSuffix(".txt")
+ .withNumShards(1));
+ } catch (Exception e) {
+ Log.error("{} failed to execute", queryNameArr[i]);
+ e.printStackTrace();
+ }
completion.submit(new TpcdsRun(pipelines[i]));
}
executor.shutdown();
+
+ printExecutionSummary(completion, queryNameArr.length);
}
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
new file mode 100644
index 0000000..bddb6a8
--- /dev/null
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.tpcds;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate the tpcds queries execution summary on the command line after finishing all jobs.
+ */
+public class SummaryGenerator {
+ private static final int PADDING_SIZE = 2;
+ private static final String NEW_LINE = "\n";
+ private static final String TABLE_JOINT_SYMBOL = "+";
+ private static final String TABLE_V_SPLIT_SYMBOL = "|";
+ private static final String TABLE_H_SPLIT_SYMBOL = "-";
+
+ public static String generateTable(List<String> headersList, List<List<String>> rowsList,int... overRiddenHeaderHeight) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
+
+ Map<Integer,Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
+
+ stringBuilder.append(NEW_LINE);
+ stringBuilder.append(NEW_LINE);
+ createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+ stringBuilder.append(NEW_LINE);
+
+ for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
+ fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
+ }
+
+ stringBuilder.append(NEW_LINE);
+
+ createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+
+ for (List<String> row : rowsList) {
+ for (int i = 0; i < rowHeight; i++) {
+ stringBuilder.append(NEW_LINE);
+ }
+ for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
+ fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
+ }
+ }
+
+ stringBuilder.append(NEW_LINE);
+ createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+ stringBuilder.append(NEW_LINE);
+ stringBuilder.append(NEW_LINE);
+
+ return stringBuilder.toString();
+ }
+
+ private static void fillSpace(StringBuilder stringBuilder, int length) {
+ for (int i = 0; i < length; i++) {
+ stringBuilder.append(" ");
+ }
+ }
+
+ /** Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the summary table. */
+ private static void createRowLine(StringBuilder stringBuilder,int headersListSize, Map<Integer,Integer> columnMaxWidthMapping) {
+ for (int i = 0; i < headersListSize; i++) {
+ if(i == 0) {
+ stringBuilder.append(TABLE_JOINT_SYMBOL);
+ }
+
+ for (int j = 0; j < columnMaxWidthMapping.get(i) + PADDING_SIZE * 2 ; j++) {
+ stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
+ }
+ stringBuilder.append(TABLE_JOINT_SYMBOL);
+ }
+ }
+
+ /** Get the width of the summary table. */
+ private static Map<Integer,Integer> getMaximumWidthofTable(List<String> headersList, List<List<String>> rowsList) {
+ Map<Integer,Integer> columnMaxWidthMapping = new HashMap<>();
+
+ for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+ columnMaxWidthMapping.put(columnIndex, 0);
+ }
+
+ for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+ if(headersList.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
+ columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
+ }
+ }
+
+ for (List<String> row : rowsList) {
+ for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
+ if(row.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
+ columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
+ }
+ }
+ }
+
+ for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+ if(columnMaxWidthMapping.get(columnIndex) % 2 != 0) {
+ columnMaxWidthMapping.put(columnIndex, columnMaxWidthMapping.get(columnIndex) + 1);
+ }
+ }
+
+ return columnMaxWidthMapping;
+ }
+
+ private static int getOptimumCellPadding(int cellIndex,int datalength,Map<Integer,Integer> columnMaxWidthMapping,int cellPaddingSize) {
+ if(datalength % 2 != 0) {
+ datalength++;
+ }
+
+ if(datalength < columnMaxWidthMapping.get(cellIndex)) {
+ cellPaddingSize = cellPaddingSize + (columnMaxWidthMapping.get(cellIndex) - datalength) / 2;
+ }
+
+ return cellPaddingSize;
+ }
+
+ /** Use space to fill a single cell with optimum cell padding size. */
+ private static void fillCell(StringBuilder stringBuilder,String cell,int cellIndex,Map<Integer,Integer> columnMaxWidthMapping) {
+
+ int cellPaddingSize = getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
+
+ if(cellIndex == 0) {
+ stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+ }
+
+ fillSpace(stringBuilder, cellPaddingSize);
+ stringBuilder.append(cell);
+ if(cell.length() % 2 != 0) {
+ stringBuilder.append(" ");
+ }
+
+ fillSpace(stringBuilder, cellPaddingSize);
+
+ stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+ }
+}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
index 936c24f..1070a88 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
@@ -19,12 +19,13 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
import java.util.concurrent.Callable;
/**
* To fulfill multi-threaded execution
*/
-public class TpcdsRun implements Callable<PipelineResult> {
+public class TpcdsRun implements Callable<TpcdsRunResult> {
private final Pipeline pipeline;
public TpcdsRun (Pipeline pipeline) {
@@ -32,9 +33,24 @@
}
@Override
- public PipelineResult call() {
- PipelineResult pipelineResult = pipeline.run();
- pipelineResult.waitUntilFinish();
- return pipelineResult;
+ public TpcdsRunResult call() {
+ TpcdsRunResult tpcdsRunResult;
+
+ try {
+ PipelineResult pipelineResult = pipeline.run();
+ long startTimeStamp = System.currentTimeMillis();
+ State state = pipelineResult.waitUntilFinish();
+ long endTimeStamp = System.currentTimeMillis();
+
+ // Make sure to set the job status to be successful only when pipelineResult's final state is DONE.
+ boolean isSuccessful = state == State.DONE;
+ tpcdsRunResult = new TpcdsRunResult(isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
+ } catch (Exception e) {
+ // If the pipeline execution failed, return a result with failed status but don't interrupt other threads.
+ e.printStackTrace();
+ tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
+ }
+
+ return tpcdsRunResult;
}
}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
new file mode 100644
index 0000000..0e22dce
--- /dev/null
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.tpcds;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import java.sql.Timestamp;
+import java.util.Date;
+
+
+public class TpcdsRunResult {
+ private boolean isSuccessful;
+ private long startTime;
+ private long endTime;
+ private PipelineOptions pipelineOptions;
+ private PipelineResult pipelineResult;
+
+ public TpcdsRunResult(boolean isSuccessful, long startTime, long endTime, PipelineOptions pipelineOptions, PipelineResult pipelineResult) {
+ this.isSuccessful = isSuccessful;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.pipelineOptions = pipelineOptions;
+ this.pipelineResult = pipelineResult;
+ }
+
+ public boolean getIsSuccessful() { return isSuccessful; }
+
+ public Date getStartDate() {
+ Timestamp startTimeStamp = new Timestamp(startTime);
+ Date startDate = new Date(startTimeStamp.getTime());
+ return startDate;
+ }
+
+ public Date getEndDate() {
+ Timestamp endTimeStamp = new Timestamp(endTime);
+ Date endDate = new Date(endTimeStamp.getTime());
+ return endDate;
+ }
+
+ public double getElapsedTime() {
+ return (endTime - startTime) / 1000.0;
+ }
+
+ public PipelineOptions getPipelineOptions() { return pipelineOptions; }
+
+ public PipelineResult getPipelineResult() { return pipelineResult; }
+
+ public String getJobName() {
+ PipelineOptions pipelineOptions = getPipelineOptions();
+ return pipelineOptions.getJobName();
+ }
+
+ public String getQueryName() {
+ String jobName = getJobName();
+ int endIndex = jobName.indexOf("result");
+ String queryName = jobName.substring(0, endIndex);
+ return queryName;
+ }
+
+ public String getDataSize() throws Exception {
+ PipelineOptions pipelineOptions = getPipelineOptions();
+ return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
+ }
+
+ public String getDialect() throws Exception {
+ PipelineOptions pipelineOptions = getPipelineOptions();
+ String queryPlannerClassName = pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
+ String dialect;
+ if (queryPlannerClassName.equals("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
+ dialect = "ZetaSQL";
+ } else {
+ dialect = "Calcite";
+ }
+ return dialect;
+ }
+}