[GOBBLIN-2066] Report dataset Metrics Summary on Temporal (#3912)

Report dataset metrics summary on temporal
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 282b3cb..76c5f56 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -18,6 +18,9 @@
 package org.apache.gobblin.runtime;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
 
 import org.apache.gobblin.metrics.DatasetMetric;
 
@@ -27,11 +30,13 @@
  * that can be reported as a single event in the commit phase.
  */
 @Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 public class DatasetTaskSummary {
-  private final String datasetUrn;
-  private final long recordsWritten;
-  private final long bytesWritten;
-  private final boolean successfullyCommitted;
+  @NonNull private String datasetUrn;
+  @NonNull private long recordsWritten;
+  @NonNull private long bytesWritten;
+  @NonNull private boolean successfullyCommitted;
 
   /**
    * Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 7a8d42c..3f6dca4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -93,7 +93,7 @@
   private final String jobId;
   private final String jobSequence;
   private final JobState jobState;
-  @Getter(AccessLevel.PACKAGE)
+  @Getter
   private final JobCommitPolicy jobCommitPolicy;
   // A job commit semantic where we want partially successful tasks to commit their data, but still fail the job
   // WARNING: this is for Gobblin jobs that do not record their watermark, hence this would not lead to duplicate work
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
index 1f29a7a..29fbfc7 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
@@ -19,6 +19,8 @@
 
 import io.temporal.activity.ActivityInterface;
 import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
 
@@ -32,5 +34,5 @@
    * @return number of workunits committed
    */
   @ActivityMethod
-  int commit(WUProcessingSpec workSpec);
+  CommitStats commit(WUProcessingSpec workSpec);
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 94b5420..f409e51 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +35,7 @@
 import com.google.api.client.util.Lists;
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import io.temporal.failure.ApplicationFailure;
 
@@ -44,6 +46,7 @@
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.JobContext;
@@ -51,18 +54,21 @@
 import org.apache.gobblin.runtime.SafeDatasetCommit;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
-
 @Slf4j
 public class CommitActivityImpl implements CommitActivity {
 
@@ -71,7 +77,7 @@
   static String UNDEFINED_JOB_NAME = "<job_name_stub>";
 
   @Override
-  public int commit(WUProcessingSpec workSpec) {
+  public CommitStats commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
     Optional<String> optJobName = Optional.empty();
@@ -84,11 +90,20 @@
       troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
       troubleshooter.start();
       List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads);
-      if (!taskStates.isEmpty()) {
-        JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
-        commitTaskStates(jobState, taskStates, jobContext);
+      if (taskStates.isEmpty()) {
+        return CommitStats.createEmpty();
       }
-      return taskStates.size();
+
+      JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
+      Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList());
+      TaskState firstTaskState = taskStates.get(0);
+      log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
+      commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+
+      boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
+
+      Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks);
+      return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
     } catch (Exception e) {
       //TODO: IMPROVE GRANULARITY OF RETRIES
       throw ApplicationFailure.newNonRetryableFailureWithCause(
@@ -106,17 +121,11 @@
   /**
    * Commit task states to the dataset state store.
    * @param jobState
-   * @param taskStates
+   * @param datasetStatesByUrns
    * @param jobContext
    * @throws IOException
    */
-  private void commitTaskStates(JobState jobState, List<TaskState> taskStates, JobContext jobContext) throws IOException {
-    if (!taskStates.isEmpty()) {
-      TaskState firstTaskState = taskStates.get(0);
-      log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
-    }
-    //TODO: handle skipped tasks?
-    Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
+  private void commitTaskStates(JobState jobState, Map<String, JobState.DatasetState> datasetStatesByUrns, JobContext jobContext) throws IOException {
     final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
     final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
     //TODO: Make this configurable
@@ -149,13 +158,11 @@
 
       IteratorExecutor.logFailures(result, null, 10);
 
-      Set<String> failedDatasetUrns = new HashSet<>();
-      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
-        // Set the overall job state to FAILED if the job failed to process any dataset
-        if (datasetState.getState() == JobState.RunningState.FAILED) {
-          failedDatasetUrns.add(datasetState.getDatasetUrn());
-        }
-      }
+      Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
+          .filter(datasetState -> datasetState.getState() == JobState.RunningState.FAILED)
+          .map(JobState.DatasetState::getDatasetUrn)
+          .collect(Collectors.toCollection(HashSet::new));
+
       if (!failedDatasetUrns.isEmpty()) {
         String allFailedDatasets = String.join(", ", failedDatasetUrns);
         log.error("Failed to commit dataset state for dataset(s) {}" + allFailedDatasets);
@@ -194,6 +201,37 @@
     });
   }
 
+  private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy, boolean shouldIncludeFailedTasks) {
+    Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
+    // Only process successful datasets unless configuration to process failed datasets is set
+    for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED
+          && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        int totalCommittedTasks = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          // Certain writers may omit these metrics e.g. CompactionLauncherWriter
+          if (taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED || shouldIncludeFailedTasks) {
+            if (taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
+              totalCommittedTasks++;
+            }
+            totalBytesWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+            totalRecordsWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+          }
+        }
+        log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: %d)", datasetState.getDatasetUrn(),
+            totalRecordsWritten, totalBytesWritten));
+        datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats(totalRecordsWritten, totalBytesWritten, true, totalCommittedTasks));
+      } else if (datasetState.getState() == JobState.RunningState.FAILED && commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+        // Check if config is turned on for submitting writer metrics on failure due to non-atomic write semantics
+        log.info("Due to task failure, will report that no records or bytes were written for " + datasetState.getDatasetUrn());
+        datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats( 0, 0, false, 0));
+      }
+    }
+    return datasetTaskStats;
+  }
+
   /** @return id/correlator for this particular commit activity */
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 33c6d5f..6950e6a 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -34,6 +34,7 @@
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
@@ -85,8 +86,8 @@
       EventSubmitterContext eventSubmitterContext = new EventSubmitterContext.Builder(eventSubmitter)
           .withGaaSJobProps(this.jobProps)
           .build();
-      int numWorkUnits = workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
-      log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+      ExecGobblinStats execGobblinStats = workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
+      log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", execGobblinStats);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
new file mode 100644
index 0000000..f929831
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Data structure representing the stats for a committed dataset, and the total number of committed workunits in the Gobblin Temporal job
+ * Return type of {@link org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
+ * and {@link org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class CommitStats {
+  @NonNull private Map<String, DatasetStats> datasetStats;
+  @NonNull private int numCommittedWorkUnits;
+
+  public static CommitStats createEmpty() {
+    return new CommitStats(new HashMap<>(), 0);
+  }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
new file mode 100644
index 0000000..b795566
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Stats for a dataset that was committed.
+ */
+@Data
+@NonNull
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class DatasetStats {
+  @NonNull private long recordsWritten;
+  @NonNull private long bytesWritten;
+  @NonNull private boolean successfullyCommitted;
+  @NonNull private int numCommittedWorkunits;
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
new file mode 100644
index 0000000..abaae2a
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.Map;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Capture details (esp. for the temporal UI) of a {@link org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
+@Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class ExecGobblinStats {
+  @NonNull private int numWorkUnits;
+  @NonNull private int numCommitted;
+  @NonNull private String user;
+  @NonNull private Map<String, DatasetStats> stats;
+}
+
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 1474967..462e784 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -80,10 +80,10 @@
     return name + "_" + calcPerExecQualifier(workerConfig);
   }
 
-  /** @return execution-specific name, incorporating any {@link ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
-  public static String qualifyNamePerExecWithFlowExecId(String name, Config jobProps) {
-    Optional<String> optFlowExecId = Optional.ofNullable(ConfigUtils.getString(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
-    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, jobProps);
+  /** @return execution-specific name, incorporating any {@link ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `config` */
+  public static String qualifyNamePerExecWithFlowExecId(String name, Config config) {
+    Optional<String> optFlowExecId = Optional.ofNullable(ConfigUtils.getString(config, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, config);
   }
 
   public static String calcPerExecQualifierWithOptFlowExecId(Optional<String> optFlowExecId, Config workerConfig) {
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
index f6f4970..c536828 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -19,6 +19,8 @@
 
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
 
@@ -33,5 +35,5 @@
      * @return number of workunits committed
      */
     @WorkflowMethod
-    int commit(WUProcessingSpec workSpec);
+    CommitStats commit(WUProcessingSpec workSpec);
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
index d764f37..1ffcf36 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
@@ -23,6 +23,7 @@
 import io.temporal.workflow.WorkflowMethod;
 
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
@@ -37,5 +38,5 @@
 public interface ExecuteGobblinWorkflow {
   /** @return the number of {@link WorkUnit}s discovered and successfully processed */
   @WorkflowMethod
-  int execute(Properties props, EventSubmitterContext eventSubmitterContext);
+  ExecGobblinStats execute(Properties props, EventSubmitterContext eventSubmitterContext);
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index eccad9b..a6018d4 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -21,6 +21,7 @@
 import io.temporal.workflow.WorkflowMethod;
 
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
 
@@ -29,5 +30,5 @@
 public interface ProcessWorkUnitsWorkflow {
   /** @return the number of {@link WorkUnit}s cumulatively processed successfully */
   @WorkflowMethod
-  int process(WUProcessingSpec wuSpec);
+  CommitStats process(WUProcessingSpec wuSpec);
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 2b674ec..263ed7e 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -22,11 +22,21 @@
 import io.temporal.workflow.Workflow;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.runtime.util.GsonUtils;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 
 
 @Slf4j
@@ -47,7 +57,20 @@
   private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
 
   @Override
-  public int commit(WUProcessingSpec workSpec) {
-    return activityStub.commit(workSpec);
+  public CommitStats commit(WUProcessingSpec workSpec) {
+    CommitStats commitGobblinStats = activityStub.commit(workSpec);
+    TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+        .submit();
+    return commitGobblinStats;
+  }
+
+  private List<DatasetTaskSummary> convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
+      datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), entry.getValue().isSuccessfullyCommitted()));
+    }
+    return datasetTaskSummaries;
   }
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 5da320d..9d6776a 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -39,6 +39,8 @@
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -71,43 +73,31 @@
       GEN_WUS_ACTIVITY_OPTS);
 
   @Override
-  public int execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
+  public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-    int numWUsGenerated = 0;
     try {
-      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+      int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+      int numWUsCommitted = 0;
+      CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
-        JobState jobState = new JobState(jobProps);
-        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext);
         ProcessWorkUnitsWorkflow processWUsWorkflow = createProcessWorkUnitsWorkflow(jobProps);
-        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, workUnitsDirPath.toString(), eventSubmitterContext);
-        // TODO: use our own prop names; don't "borrow" from `ProcessWorkUnitsJobLauncher`
-        if (jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
-            && jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {
-          int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-          int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
-        }
-
-        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-        if (numWUsProcessed != numWUsGenerated) {
-          log.warn("Not all work units generated were processed: {} != {}", numWUsGenerated, numWUsProcessed);
-          // TODO provide more robust indication that things went wrong!  (retryable or non-retryable error??)
-        }
+        commitStats = processWUsWorkflow.process(wuSpec);
+        numWUsCommitted = commitStats.getNumCommittedWorkUnits();
       }
       timer.stop();
+      return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats());
     } catch (Exception e) {
       // Emit a failed GobblinTrackingEvent to record job failures
-      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
-          e.getClass().toString(),
-          e
+          e.getClass().getName(),
+          e,
+          null
       );
     }
-    return numWUsGenerated;
   }
 
   protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties jobProps) {
@@ -117,4 +107,19 @@
         .build();
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, childOpts);
   }
+
+  protected static WUProcessingSpec createProcessingSpec(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
+    JobState jobState = new JobState(jobProps);
+    URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+    Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+    WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, workUnitsDirPath.toString(), eventSubmitterContext);
+    // TODO: use our own prop names; don't "borrow" from `ProcessWorkUnitsJobLauncher`
+    if (jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+        && jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {
+      int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+      int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+      wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
+    }
+    return wuSpec;
+  }
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 844e557..c8afbce 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -27,6 +27,7 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
@@ -48,14 +49,14 @@
   public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow";
 
   @Override
-  public int process(WUProcessingSpec workSpec) {
+  public CommitStats process(WUProcessingSpec workSpec) {
     Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
-    int result = performWork(workSpec);
+    CommitStats result = performWork(workSpec);
     timer.ifPresent(EventTimer::stop);
     return result;
   }
 
-  private int performWork(WUProcessingSpec workSpec) {
+  private CommitStats performWork(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec);
     int workunitsProcessed = processingWorkflow.performWorkload(
@@ -64,14 +65,14 @@
     );
     if (workunitsProcessed > 0) {
       CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
-      int result = commitWorkflow.commit(workSpec);
-      if (result == 0) {
+      CommitStats result = commitWorkflow.commit(workSpec);
+      if (result.getNumCommittedWorkUnits() == 0) {
         log.warn("No work units committed at the job level. They could have been committed at the task level.");
       }
       return result;
     } else {
       log.error("No work units processed, so no commit attempted.");
-      return 0;
+      return CommitStats.createEmpty();
     }
   }
 
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 9d26360..0cb03a1 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -60,7 +60,7 @@
         TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
         try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
             LOG.info("Executing getGreeting");
-            timer.addMetadata("name", name);
+            timer.withMetadata("name", name);
             return formatActivity.composeGreeting(name);
         }
     }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index 677baf4..003a059 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -45,7 +45,7 @@
    * @param key
    * @param metadata
    */
-  void addMetadata(String key, String metadata);
+  EventTimer withMetadata(String key, String metadata);
 
   /**
    * Stops the timer and execute any post-processing (e.g. event submission)
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index a1fba1e..c9d9e94 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -44,17 +44,21 @@
   private final EventSubmitterContext eventSubmitterContext;
   private final Instant startTime;
 
+  // Alias to stop()
+  public void submit() {
+    stop();
+  }
   @Override
   public void stop() {
     stop(getCurrentTime());
   }
 
   @Override
-  public void addMetadata(String key, String metadata) {
+  public TemporalEventTimer withMetadata(String key, String metadata) {
     this.eventBuilder.addMetadata(key, metadata);
+    return this;
   }
 
-
   private void stop(Instant endTime) {
     this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, TimingEvent.METADATA_TIMING_EVENT);
     this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, Long.toString(this.startTime.toEpochMilli()));