[GOBBLIN-2066] Report dataset Metrics Summary on Temporal (#3912)
Report dataset metrics summary on temporal
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 282b3cb..76c5f56 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -18,6 +18,9 @@
package org.apache.gobblin.runtime;
import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
import org.apache.gobblin.metrics.DatasetMetric;
@@ -27,11 +30,13 @@
* that can be reported as a single event in the commit phase.
*/
@Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class DatasetTaskSummary {
- private final String datasetUrn;
- private final long recordsWritten;
- private final long bytesWritten;
- private final boolean successfullyCommitted;
+ @NonNull private String datasetUrn;
+ @NonNull private long recordsWritten;
+ @NonNull private long bytesWritten;
+ @NonNull private boolean successfullyCommitted;
/**
* Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 7a8d42c..3f6dca4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -93,7 +93,7 @@
private final String jobId;
private final String jobSequence;
private final JobState jobState;
- @Getter(AccessLevel.PACKAGE)
+ @Getter
private final JobCommitPolicy jobCommitPolicy;
// A job commit semantic where we want partially successful tasks to commit their data, but still fail the job
// WARNING: this is for Gobblin jobs that do not record their watermark, hence this would not lead to duplicate work
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
index 1f29a7a..29fbfc7 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
@@ -19,6 +19,8 @@
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -32,5 +34,5 @@
* @return number of workunits committed
*/
@ActivityMethod
- int commit(WUProcessingSpec workSpec);
+ CommitStats commit(WUProcessingSpec workSpec);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 94b5420..f409e51 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,7 @@
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.temporal.failure.ApplicationFailure;
@@ -44,6 +46,7 @@
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
@@ -51,18 +54,21 @@
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
-
@Slf4j
public class CommitActivityImpl implements CommitActivity {
@@ -71,7 +77,7 @@
static String UNDEFINED_JOB_NAME = "<job_name_stub>";
@Override
- public int commit(WUProcessingSpec workSpec) {
+ public CommitStats commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
Optional<String> optJobName = Optional.empty();
@@ -84,11 +90,20 @@
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads);
- if (!taskStates.isEmpty()) {
- JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
- commitTaskStates(jobState, taskStates, jobContext);
+ if (taskStates.isEmpty()) {
+ return CommitStats.createEmpty();
}
- return taskStates.size();
+
+ JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
+ Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList());
+ TaskState firstTaskState = taskStates.get(0);
+ log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+
+ boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
+
+ Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks);
+ return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
@@ -106,17 +121,11 @@
/**
* Commit task states to the dataset state store.
* @param jobState
- * @param taskStates
+ * @param datasetStatesByUrns
* @param jobContext
* @throws IOException
*/
- private void commitTaskStates(JobState jobState, List<TaskState> taskStates, JobContext jobContext) throws IOException {
- if (!taskStates.isEmpty()) {
- TaskState firstTaskState = taskStates.get(0);
- log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
- }
- //TODO: handle skipped tasks?
- Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
+ private void commitTaskStates(JobState jobState, Map<String, JobState.DatasetState> datasetStatesByUrns, JobContext jobContext) throws IOException {
final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
//TODO: Make this configurable
@@ -149,13 +158,11 @@
IteratorExecutor.logFailures(result, null, 10);
- Set<String> failedDatasetUrns = new HashSet<>();
- for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
- // Set the overall job state to FAILED if the job failed to process any dataset
- if (datasetState.getState() == JobState.RunningState.FAILED) {
- failedDatasetUrns.add(datasetState.getDatasetUrn());
- }
- }
+ Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
+ .filter(datasetState -> datasetState.getState() == JobState.RunningState.FAILED)
+ .map(JobState.DatasetState::getDatasetUrn)
+ .collect(Collectors.toCollection(HashSet::new));
+
if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
log.error("Failed to commit dataset state for dataset(s) {}" + allFailedDatasets);
@@ -194,6 +201,37 @@
});
}
+ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy, boolean shouldIncludeFailedTasks) {
+ Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
+ // Only process successful datasets unless configuration to process failed datasets is set
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+ if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED
+ && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ long totalBytesWritten = 0;
+ long totalRecordsWritten = 0;
+ int totalCommittedTasks = 0;
+ for (TaskState taskState : datasetState.getTaskStates()) {
+ // Certain writers may omit these metrics e.g. CompactionLauncherWriter
+ if (taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED || shouldIncludeFailedTasks) {
+ if (taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
+ totalCommittedTasks++;
+ }
+ totalBytesWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+ totalRecordsWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+ }
+ }
+ log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: %d)", datasetState.getDatasetUrn(),
+ totalRecordsWritten, totalBytesWritten));
+ datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats(totalRecordsWritten, totalBytesWritten, true, totalCommittedTasks));
+ } else if (datasetState.getState() == JobState.RunningState.FAILED && commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+ // Check if config is turned on for submitting writer metrics on failure due to non-atomic write semantics
+ log.info("Due to task failure, will report that no records or bytes were written for " + datasetState.getDatasetUrn());
+ datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats( 0, 0, false, 0));
+ }
+ }
+ return datasetTaskStats;
+ }
+
/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 33c6d5f..6950e6a 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -34,6 +34,7 @@
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
@@ -85,8 +86,8 @@
EventSubmitterContext eventSubmitterContext = new EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(this.jobProps)
.build();
- int numWorkUnits = workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
- log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+ ExecGobblinStats execGobblinStats = workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
+ log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", execGobblinStats);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
new file mode 100644
index 0000000..f929831
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Data structure representing the stats for a committed dataset, and the total number of committed workunits in the Gobblin Temporal job
+ * Return type of {@link org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
+ * and {@link org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class CommitStats {
+ @NonNull private Map<String, DatasetStats> datasetStats;
+ @NonNull private int numCommittedWorkUnits;
+
+ public static CommitStats createEmpty() {
+ return new CommitStats(new HashMap<>(), 0);
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
new file mode 100644
index 0000000..b795566
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Stats for a dataset that was committed.
+ */
+@Data
+@NonNull
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class DatasetStats {
+ @NonNull private long recordsWritten;
+ @NonNull private long bytesWritten;
+ @NonNull private boolean successfullyCommitted;
+ @NonNull private int numCommittedWorkunits;
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
new file mode 100644
index 0000000..abaae2a
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.Map;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Capture details (esp. for the temporal UI) of a {@link org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
+@Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class ExecGobblinStats {
+ @NonNull private int numWorkUnits;
+ @NonNull private int numCommitted;
+ @NonNull private String user;
+ @NonNull private Map<String, DatasetStats> stats;
+}
+
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 1474967..462e784 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -80,10 +80,10 @@
return name + "_" + calcPerExecQualifier(workerConfig);
}
- /** @return execution-specific name, incorporating any {@link ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
- public static String qualifyNamePerExecWithFlowExecId(String name, Config jobProps) {
- Optional<String> optFlowExecId = Optional.ofNullable(ConfigUtils.getString(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
- return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, jobProps);
+ /** @return execution-specific name, incorporating any {@link ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `config` */
+ public static String qualifyNamePerExecWithFlowExecId(String name, Config config) {
+ Optional<String> optFlowExecId = Optional.ofNullable(ConfigUtils.getString(config, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+ return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, config);
}
public static String calcPerExecQualifierWithOptFlowExecId(Optional<String> optFlowExecId, Config workerConfig) {
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
index f6f4970..c536828 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -19,6 +19,8 @@
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -33,5 +35,5 @@
* @return number of workunits committed
*/
@WorkflowMethod
- int commit(WUProcessingSpec workSpec);
+ CommitStats commit(WUProcessingSpec workSpec);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
index d764f37..1ffcf36 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
@@ -23,6 +23,7 @@
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -37,5 +38,5 @@
public interface ExecuteGobblinWorkflow {
/** @return the number of {@link WorkUnit}s discovered and successfully processed */
@WorkflowMethod
- int execute(Properties props, EventSubmitterContext eventSubmitterContext);
+ ExecGobblinStats execute(Properties props, EventSubmitterContext eventSubmitterContext);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index eccad9b..a6018d4 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -21,6 +21,7 @@
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -29,5 +30,5 @@
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed successfully */
@WorkflowMethod
- int process(WUProcessingSpec wuSpec);
+ CommitStats process(WUProcessingSpec wuSpec);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 2b674ec..263ed7e 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -22,11 +22,21 @@
import io.temporal.workflow.Workflow;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
@Slf4j
@@ -47,7 +57,20 @@
private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
@Override
- public int commit(WUProcessingSpec workSpec) {
- return activityStub.commit(workSpec);
+ public CommitStats commit(WUProcessingSpec workSpec) {
+ CommitStats commitGobblinStats = activityStub.commit(workSpec);
+ TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .submit();
+ return commitGobblinStats;
+ }
+
+ private List<DatasetTaskSummary> convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
+ List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+ for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
+ datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), entry.getValue().isSuccessfullyCommitted()));
+ }
+ return datasetTaskSummaries;
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 5da320d..9d6776a 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -39,6 +39,8 @@
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -71,43 +73,31 @@
GEN_WUS_ACTIVITY_OPTS);
@Override
- public int execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
+ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
- int numWUsGenerated = 0;
try {
- numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+ int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+ int numWUsCommitted = 0;
+ CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext);
ProcessWorkUnitsWorkflow processWUsWorkflow = createProcessWorkUnitsWorkflow(jobProps);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from `ProcessWorkUnitsJobLauncher`
- if (jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
- && jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {
- int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}", numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong! (retryable or non-retryable error??)
- }
+ commitStats = processWUsWorkflow.process(wuSpec);
+ numWUsCommitted = commitStats.getNumCommittedWorkUnits();
}
timer.stop();
+ return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats());
} catch (Exception e) {
// Emit a failed GobblinTrackingEvent to record job failures
- timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
- e.getClass().toString(),
- e
+ e.getClass().getName(),
+ e,
+ null
);
}
- return numWUsGenerated;
}
protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties jobProps) {
@@ -117,4 +107,19 @@
.build();
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, childOpts);
}
+
+ protected static WUProcessingSpec createProcessingSpec(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from `ProcessWorkUnitsJobLauncher`
+ if (jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ && jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {
+ int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
+ }
+ return wuSpec;
+ }
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 844e557..c8afbce 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -27,6 +27,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
@@ -48,14 +49,14 @@
public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow";
@Override
- public int process(WUProcessingSpec workSpec) {
+ public CommitStats process(WUProcessingSpec workSpec) {
Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
- int result = performWork(workSpec);
+ CommitStats result = performWork(workSpec);
timer.ifPresent(EventTimer::stop);
return result;
}
- private int performWork(WUProcessingSpec workSpec) {
+ private CommitStats performWork(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec);
int workunitsProcessed = processingWorkflow.performWorkload(
@@ -64,14 +65,14 @@
);
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
- int result = commitWorkflow.commit(workSpec);
- if (result == 0) {
+ CommitStats result = commitWorkflow.commit(workSpec);
+ if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have been committed at the task level.");
}
return result;
} else {
log.error("No work units processed, so no commit attempted.");
- return 0;
+ return CommitStats.createEmpty();
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 9d26360..0cb03a1 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -60,7 +60,7 @@
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
- timer.addMetadata("name", name);
+ timer.withMetadata("name", name);
return formatActivity.composeGreeting(name);
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index 677baf4..003a059 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -45,7 +45,7 @@
* @param key
* @param metadata
*/
- void addMetadata(String key, String metadata);
+ EventTimer withMetadata(String key, String metadata);
/**
* Stops the timer and execute any post-processing (e.g. event submission)
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index a1fba1e..c9d9e94 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -44,17 +44,21 @@
private final EventSubmitterContext eventSubmitterContext;
private final Instant startTime;
+ // Alias to stop()
+ public void submit() {
+ stop();
+ }
@Override
public void stop() {
stop(getCurrentTime());
}
@Override
- public void addMetadata(String key, String metadata) {
+ public TemporalEventTimer withMetadata(String key, String metadata) {
this.eventBuilder.addMetadata(key, metadata);
+ return this;
}
-
private void stop(Instant endTime) {
this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, TimingEvent.METADATA_TIMING_EVENT);
this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, Long.toString(this.startTime.toEpochMilli()));