[BEAM-9891] Generate query execution summary table after finishing jobs (#12601)

* [BEAM-9891] Generate query execution summary table after finishing jobs

* Print error message using LOG, check PipelineResult's state

Co-authored-by: Yuwei Fu <fuyuwei@google.com>
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
index cbc0577..f94b748 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
@@ -20,7 +20,7 @@
 import com.alibaba.fastjson.JSONObject;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -35,19 +35,27 @@
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and BeamSqlEnv.parseQuery to run queries.
  */
 public class BeamSqlEnvRunner {
-    private static final String dataDirectory = "gs://beamsql_tpcds_1/data";
-    private static final String resultDirectory = "gs://beamsql_tpcds_1/tpcds_results";
+    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
+
+    private static final Logger Log = LoggerFactory.getLogger(BeamSqlEnvRunner.class);
 
     private static String buildTableCreateStatement(String tableName) {
         String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'";
@@ -55,7 +63,7 @@
     }
 
     private static String buildDataLocation(String dataSize, String tableName) {
-        String dataLocation = dataDirectory + "/" + dataSize + "/" + tableName + ".dat";
+        String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
         return dataLocation;
     }
 
@@ -78,7 +86,7 @@
 
         Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
         for (String tableName : schemaMap.keySet()) {
-            String dataLocation = dataDirectory + "/" + dataSize + "/" + tableName + ".dat";
+            String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
             Schema tableSchema = schemaMap.get(tableName);
             Table table = Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
             inMemoryMetaStore.createTable(table);
@@ -86,6 +94,32 @@
     }
 
     /**
+     * Print the summary table after all jobs are finished.
+     * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+     * @param numOfResults The number of results in the collection.
+     * @throws Exception
+     */
+    private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+        List<List<String>> summaryRowsList = new ArrayList<>();
+        for (int i = 0; i < numOfResults; i++) {
+            TpcdsRunResult tpcdsRunResult = completion.take().get();
+            List<String> list = new ArrayList<>();
+            list.add(tpcdsRunResult.getQueryName());
+            list.add(tpcdsRunResult.getJobName());
+            list.add(tpcdsRunResult.getDataSize());
+            list.add(tpcdsRunResult.getDialect());
+            list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
+            list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+            summaryRowsList.add(list);
+        }
+
+        System.out.println(SUMMARY_START);
+        System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+    }
+
+    /**
      * This is the alternative method in BeamTpcds.main method. Run job using BeamSqlEnv.parseQuery() method. (Doesn't perform well when running query96).
      * @param args Command line arguments
      * @throws Exception
@@ -102,7 +136,7 @@
 
         // Using ExecutorService and CompletionService to fulfill multi-threading functionality
         ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor);
+        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
 
         // Directly create all tables and register them into inMemoryMetaStore before creating BeamSqlEnv object.
         registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
@@ -130,18 +164,25 @@
             pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
             String queryString = QueryReader.readQuery(queryNameArr[i]);
 
-            // Query execution
-            PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
+            try {
+                // Query execution
+                PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString));
 
-            // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
-            PCollection<String> rowStrings = rows.apply(MapElements
-                    .into(TypeDescriptors.strings())
-                    .via((Row row) -> row.toString()));
-            rowStrings.apply(TextIO.write().to(resultDirectory + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
+                // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored.
+                PCollection<String> rowStrings = rows.apply(MapElements
+                        .into(TypeDescriptors.strings())
+                        .via((Row row) -> row.toString()));
+                rowStrings.apply(TextIO.write().to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1));
+            } catch (Exception e) {
+                Log.error("{} failed to execute", queryNameArr[i]);
+                e.printStackTrace();
+            }
 
             completion.submit(new TpcdsRun(pipelines[i]));
         }
 
         executor.shutdown();
+
+        printExecutionSummary(completion, queryNameArr.length);
     }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
index 2614ce2..a40cd12 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
@@ -19,7 +19,6 @@
 
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
@@ -30,18 +29,28 @@
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.*;
 import org.apache.commons.csv.CSVFormat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class executes jobs using PCollection and SqlTransform, it uses SqlTransform.query to run queries.
  */
 public class SqlTransformRunner {
-    private static final String dataDirectory = "gs://beamsql_tpcds_1/data";
-    private static final String resultDirectory = "gs://beamsql_tpcds_1/tpcds_results";
+    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+    private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results";
+    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:";
+    private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", "Start Time", "End Time", "Elapsed Time(sec)");
+
+    private static final Logger Log = LoggerFactory.getLogger(SqlTransform.class);
 
     /**
      * Get all tables (in the form of TextTable) needed for a specific query execution
@@ -64,7 +73,7 @@
             // Only when queryString contains tableName, the table is relevant to this query and will be added. This can avoid reading unnecessary data files.
             if (queryString.contains(tableName)) {
                 // This is location path where the data are stored
-                String filePattern = dataDirectory + "/" + dataSize + "/" + tableName + ".dat";
+                String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
 
                 PCollection<Row> table =
                         new TextTable(
@@ -83,8 +92,34 @@
     }
 
     /**
-     * This is the default method in BeamTpcds.main method. Run job using SqlTransform.query()
-     * method.
+     * Print the summary table after all jobs are finished.
+     * @param completion A collection of all TpcdsRunResult that are from finished jobs.
+     * @param numOfResults The number of results in the collection.
+     * @throws Exception
+     */
+    private static void printExecutionSummary(CompletionService<TpcdsRunResult> completion, int numOfResults) throws Exception {
+        List<List<String>> summaryRowsList = new ArrayList<>();
+        for (int i = 0; i < numOfResults; i++) {
+            TpcdsRunResult tpcdsRunResult = completion.take().get();
+            List<String> list = new ArrayList<>();
+            list.add(tpcdsRunResult.getQueryName());
+            list.add(tpcdsRunResult.getJobName());
+            list.add(tpcdsRunResult.getDataSize());
+            list.add(tpcdsRunResult.getDialect());
+            // If the job is not successful, leave the run time related field blank
+            list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getStartDate().toString() : "");
+            list.add(tpcdsRunResult.getIsSuccessful() ? tpcdsRunResult.getEndDate().toString(): "");
+            list.add(tpcdsRunResult.getIsSuccessful() ? Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+            summaryRowsList.add(list);
+        }
+
+        System.out.println(SUMMARY_START);
+        System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, summaryRowsList));
+    }
+
+    /**
+     * This is the default method in BeamTpcds.main method. Run job using SqlTranform.query() method.
      * @param args Command line arguments
      * @throws Exception
      */
@@ -97,7 +132,7 @@
 
         // Using ExecutorService and CompletionService to fulfill multi-threading functionality
         ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor);
+        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);
 
         // Make an array of pipelines, each pipeline is responsible for running a corresponding query.
         Pipeline[] pipelines = new Pipeline[queryNameArr.length];
@@ -121,19 +156,26 @@
             String queryString = QueryReader.readQuery(queryNameArr[i]);
             PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
 
-            tables
-                    .apply(
-                            SqlTransform.query(queryString))
-                    .apply(
-                            MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
-                    .apply(TextIO.write()
-                            .to(resultDirectory + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
-                            .withSuffix(".txt")
-                            .withNumShards(1));
+            try {
+                tables
+                        .apply(
+                                SqlTransform.query(queryString))
+                        .apply(
+                                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+                        .apply(TextIO.write()
+                                .to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
+                                .withSuffix(".txt")
+                                .withNumShards(1));
+            } catch (Exception e) {
+                Log.error("{} failed to execute", queryNameArr[i]);
+                e.printStackTrace();
+            }
 
             completion.submit(new TpcdsRun(pipelines[i]));
         }
 
         executor.shutdown();
+
+        printExecutionSummary(completion, queryNameArr.length);
     }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
new file mode 100644
index 0000000..bddb6a8
--- /dev/null
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SummaryGenerator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.tpcds;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate the tpcds queries execution summary on the command line after finishing all jobs.
+ */
+public class SummaryGenerator {
+    private static final int PADDING_SIZE = 2;
+    private static final String NEW_LINE = "\n";
+    private static final String TABLE_JOINT_SYMBOL = "+";
+    private static final String TABLE_V_SPLIT_SYMBOL = "|";
+    private static final String TABLE_H_SPLIT_SYMBOL = "-";
+
+    public static String generateTable(List<String> headersList, List<List<String>> rowsList,int... overRiddenHeaderHeight) {
+        StringBuilder stringBuilder = new StringBuilder();
+
+        int rowHeight = overRiddenHeaderHeight.length > 0 ? overRiddenHeaderHeight[0] : 1;
+
+        Map<Integer,Integer> columnMaxWidthMapping = getMaximumWidthofTable(headersList, rowsList);
+
+        stringBuilder.append(NEW_LINE);
+        stringBuilder.append(NEW_LINE);
+        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+        stringBuilder.append(NEW_LINE);
+
+        for (int headerIndex = 0; headerIndex < headersList.size(); headerIndex++) {
+            fillCell(stringBuilder, headersList.get(headerIndex), headerIndex, columnMaxWidthMapping);
+        }
+
+        stringBuilder.append(NEW_LINE);
+
+        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+
+        for (List<String> row : rowsList) {
+            for (int i = 0; i < rowHeight; i++) {
+                stringBuilder.append(NEW_LINE);
+            }
+            for (int cellIndex = 0; cellIndex < row.size(); cellIndex++) {
+                fillCell(stringBuilder, row.get(cellIndex), cellIndex, columnMaxWidthMapping);
+            }
+        }
+
+        stringBuilder.append(NEW_LINE);
+        createRowLine(stringBuilder, headersList.size(), columnMaxWidthMapping);
+        stringBuilder.append(NEW_LINE);
+        stringBuilder.append(NEW_LINE);
+
+        return stringBuilder.toString();
+    }
+
+    private static void fillSpace(StringBuilder stringBuilder, int length) {
+        for (int i = 0; i < length; i++) {
+            stringBuilder.append(" ");
+        }
+    }
+
+    /** Add a rowLine at the beginning, the middle between headersList and rowLists, the end of the summary table. */
+    private static void createRowLine(StringBuilder stringBuilder,int headersListSize, Map<Integer,Integer> columnMaxWidthMapping) {
+        for (int i = 0; i < headersListSize; i++) {
+            if(i == 0) {
+                stringBuilder.append(TABLE_JOINT_SYMBOL);
+            }
+
+            for (int j = 0; j < columnMaxWidthMapping.get(i) + PADDING_SIZE * 2 ; j++) {
+                stringBuilder.append(TABLE_H_SPLIT_SYMBOL);
+            }
+            stringBuilder.append(TABLE_JOINT_SYMBOL);
+        }
+    }
+
+    /** Get the width of the summary table. */
+    private static Map<Integer,Integer> getMaximumWidthofTable(List<String> headersList, List<List<String>> rowsList) {
+        Map<Integer,Integer> columnMaxWidthMapping = new HashMap<>();
+
+        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+            columnMaxWidthMapping.put(columnIndex, 0);
+        }
+
+        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+            if(headersList.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
+                columnMaxWidthMapping.put(columnIndex, headersList.get(columnIndex).length());
+            }
+        }
+
+        for (List<String> row : rowsList) {
+            for (int columnIndex = 0; columnIndex < row.size(); columnIndex++) {
+                if(row.get(columnIndex).length() > columnMaxWidthMapping.get(columnIndex)) {
+                    columnMaxWidthMapping.put(columnIndex, row.get(columnIndex).length());
+                }
+            }
+        }
+
+        for (int columnIndex = 0; columnIndex < headersList.size(); columnIndex++) {
+            if(columnMaxWidthMapping.get(columnIndex) % 2 != 0) {
+                columnMaxWidthMapping.put(columnIndex, columnMaxWidthMapping.get(columnIndex) + 1);
+            }
+        }
+
+        return columnMaxWidthMapping;
+    }
+
+    private static int getOptimumCellPadding(int cellIndex,int datalength,Map<Integer,Integer> columnMaxWidthMapping,int cellPaddingSize) {
+        if(datalength % 2 != 0) {
+            datalength++;
+        }
+
+        if(datalength < columnMaxWidthMapping.get(cellIndex)) {
+            cellPaddingSize = cellPaddingSize + (columnMaxWidthMapping.get(cellIndex) - datalength) / 2;
+        }
+
+        return cellPaddingSize;
+    }
+
+    /** Use space to fill a single cell with optimum cell padding size. */
+    private static void fillCell(StringBuilder stringBuilder,String cell,int cellIndex,Map<Integer,Integer> columnMaxWidthMapping) {
+
+        int cellPaddingSize = getOptimumCellPadding(cellIndex, cell.length(), columnMaxWidthMapping, PADDING_SIZE);
+
+        if(cellIndex == 0) {
+            stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+        }
+
+        fillSpace(stringBuilder, cellPaddingSize);
+        stringBuilder.append(cell);
+        if(cell.length() % 2 != 0) {
+            stringBuilder.append(" ");
+        }
+
+        fillSpace(stringBuilder, cellPaddingSize);
+
+        stringBuilder.append(TABLE_V_SPLIT_SYMBOL);
+    }
+}
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
index 936c24f..1070a88 100644
--- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
@@ -19,12 +19,13 @@
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
 import java.util.concurrent.Callable;
 
 /**
  * To fulfill multi-threaded execution
  */
-public class TpcdsRun implements Callable<PipelineResult> {
+public class TpcdsRun implements Callable<TpcdsRunResult> {
     private final Pipeline pipeline;
 
     public TpcdsRun (Pipeline pipeline) {
@@ -32,9 +33,24 @@
     }
 
     @Override
-    public PipelineResult call() {
-        PipelineResult pipelineResult = pipeline.run();
-        pipelineResult.waitUntilFinish();
-        return pipelineResult;
+    public TpcdsRunResult call() {
+        TpcdsRunResult tpcdsRunResult;
+
+        try {
+            PipelineResult pipelineResult = pipeline.run();
+            long startTimeStamp = System.currentTimeMillis();
+            State state = pipelineResult.waitUntilFinish();
+            long endTimeStamp = System.currentTimeMillis();
+
+            // Make sure to set the job status to be successful only when pipelineResult's final state is DONE.
+            boolean isSuccessful = state == State.DONE;
+            tpcdsRunResult = new TpcdsRunResult(isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
+        } catch (Exception e) {
+            // If the pipeline execution failed, return a result with failed status but don't interrupt other threads.
+            e.printStackTrace();
+            tpcdsRunResult = new TpcdsRunResult(false, 0, 0, pipeline.getOptions(), null);
+        }
+
+        return tpcdsRunResult;
     }
 }
diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
new file mode 100644
index 0000000..0e22dce
--- /dev/null
+++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRunResult.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.tpcds;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import java.sql.Timestamp;
+import java.util.Date;
+
+
+public class TpcdsRunResult {
+    private boolean isSuccessful;
+    private long startTime;
+    private long endTime;
+    private PipelineOptions pipelineOptions;
+    private PipelineResult pipelineResult;
+
+    public TpcdsRunResult(boolean isSuccessful, long startTime, long endTime, PipelineOptions pipelineOptions, PipelineResult pipelineResult) {
+        this.isSuccessful = isSuccessful;
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.pipelineOptions = pipelineOptions;
+        this.pipelineResult = pipelineResult;
+    }
+
+    public boolean getIsSuccessful() { return isSuccessful; }
+
+    public Date getStartDate() {
+        Timestamp startTimeStamp = new Timestamp(startTime);
+        Date startDate = new Date(startTimeStamp.getTime());
+        return startDate;
+    }
+
+    public Date getEndDate() {
+        Timestamp endTimeStamp = new Timestamp(endTime);
+        Date endDate = new Date(endTimeStamp.getTime());
+        return endDate;
+    }
+
+    public double getElapsedTime() {
+        return (endTime - startTime) / 1000.0;
+    }
+
+    public PipelineOptions getPipelineOptions() { return pipelineOptions; }
+
+    public PipelineResult getPipelineResult() { return pipelineResult; }
+
+    public String getJobName() {
+        PipelineOptions pipelineOptions = getPipelineOptions();
+        return pipelineOptions.getJobName();
+    }
+
+    public String getQueryName() {
+        String jobName = getJobName();
+        int endIndex = jobName.indexOf("result");
+        String queryName = jobName.substring(0, endIndex);
+        return queryName;
+    }
+
+    public String getDataSize() throws Exception {
+        PipelineOptions pipelineOptions = getPipelineOptions();
+        return TpcdsParametersReader.getAndCheckDataSize(pipelineOptions.as(TpcdsOptions.class));
+    }
+
+    public String getDialect() throws Exception {
+        PipelineOptions pipelineOptions = getPipelineOptions();
+        String queryPlannerClassName = pipelineOptions.as(BeamSqlPipelineOptions.class).getPlannerName();
+        String dialect;
+        if (queryPlannerClassName.equals("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")) {
+            dialect = "ZetaSQL";
+        } else {
+            dialect = "Calcite";
+        }
+        return dialect;
+    }
+}