[GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event in MR alternative for distcp (#3843)

[GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event in MR deprecation for distcp
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 2dfb4b7..220d95a 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -50,6 +50,7 @@
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -143,7 +144,7 @@
     this.jobsMapping = jobsMapping;
     this.locks = locks;
     this.metricContext = metricContext;
-    eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.runtime").build();
+    eventSubmitter = new EventSubmitter.Builder(this.metricContext, JobMetrics.NAMESPACE).build();
   }
 
   private boolean isRetriggeringEnabled() {
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
index 33f4b56..b84acb1 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
@@ -20,6 +20,7 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -113,7 +114,7 @@
    * @param tags Additional tags to add to the returned context.
    * @return A {@link org.apache.gobblin.metrics.MetricContext} with the appropriate tags and parent.
    */
-  public static MetricContext getMetricContext(State state, Class<?> klazz, List<Tag<?>> tags) {
+  public static MetricContext getMetricContext(State state, Class<?> klazz, Collection<Tag<?>> tags) {
     int randomId = RAND.nextInt(Integer.MAX_VALUE);
 
     List<Tag<?>> generatedTags = Lists.newArrayList();
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
index 3ce4c79..b836a56 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
@@ -21,14 +21,15 @@
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.Function;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
+import javax.annotation.Nullable;
+
 
 /**
  * A class representing a dimension or property associated with a {@link Taggable}.
@@ -66,6 +67,7 @@
     super(key, value);
   }
 
+  @JsonCreator
   public Tag(Map.Entry<? extends String, ? extends T> entry) {
     super(entry);
   }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 4805425..0fc4cba 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -17,16 +17,18 @@
 
 package org.apache.gobblin.metrics.event;
 
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import lombok.Getter;
+
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
-
-import lombok.Getter;
+import org.apache.gobblin.metrics.Tag;
 
 
 /**
@@ -204,4 +206,11 @@
   public TimingEvent getTimingEvent(String name) {
     return new TimingEvent(this, name);
   }
+
+  public List<Tag<?>> getTags() {
+    return this.metricContext.isPresent() ?
+        this.metricContext.get().getTags() :
+        null;
+  }
+
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index a3c0a40..8a626f1 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -19,6 +19,8 @@
 
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
@@ -55,9 +57,17 @@
   }
 
   public GobblinEventBuilder(String name, String namespace) {
+    this(name, namespace, Maps.newHashMap());
+  }
+
+  @JsonCreator
+  private GobblinEventBuilder(
+      @JsonProperty("name") String name,
+      @JsonProperty("namespace") String namespace,
+      @JsonProperty("metadata") Map<String, String> metadata) {
     this.name = name;
     this.namespace = namespace;
-    metadata = Maps.newHashMap();
+    this.metadata = metadata;
   }
 
   public ImmutableMap<String, String> getMetadata() {
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index ff79122..7a0d4cf 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -128,7 +128,7 @@
     super(name);
     this.stopped = false;
     this.submitter = submitter;
-    this.startTime = System.currentTimeMillis();
+    this.startTime = getStartTime();
   }
 
   /**
@@ -138,6 +138,15 @@
     stop(Maps.<String, String>newHashMap());
   }
 
+
+  public Long getStartTime() {
+    return System.currentTimeMillis();
+  }
+
+  public Long getEndTime() {
+    return System.currentTimeMillis();
+  }
+
   /**
    * Stop the timer and submit the event, along with the additional metadata specified. If the timer was already stopped
    * before, this is a no-op.
@@ -162,7 +171,7 @@
     }
 
     this.stopped = true;
-    this.endTime = System.currentTimeMillis();
+    this.endTime = getEndTime();
     this.duration = this.endTime - this.startTime;
 
     this.metadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
index ac03e52..eb20d53 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
@@ -17,9 +17,16 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 
 /**
  * Unit tests for {@link Tag}.
@@ -34,6 +41,8 @@
   private static final String PROJECT_VERSION_KEY = "project.version";
   private static final int PROJECT_VERSION = 1;
 
+  private final ObjectMapper objectMapper = new ObjectMapper();
+
   @Test
   public void testTags() {
     Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
@@ -44,4 +53,16 @@
     Assert.assertEquals(projectVersionTag.getKey(), PROJECT_VERSION_KEY);
     Assert.assertEquals(projectVersionTag.getValue().intValue(), PROJECT_VERSION);
   }
+
+  @Test
+  public void testSerde() throws IOException {
+    Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
+    Tag<Integer> projectVersionTag = new Tag<Integer>(PROJECT_VERSION_KEY, PROJECT_VERSION);
+    List<Tag<?>> tags = Arrays.asList(jobIdTag, projectVersionTag);
+    String bytes = objectMapper.writeValueAsString(tags);
+
+    JavaType type = objectMapper.getTypeFactory().constructCollectionType(List.class, Tag.class);
+    List<Tag<?>> deserTags = objectMapper.readValue(bytes, type);
+    Assert.assertEquals(deserTags, tags);
+  }
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 8fe09e5..db27935 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -135,10 +135,6 @@
 
   @Override
   public void cancel() throws Exception {
-    try {
-      this.gobblinYarnAppLauncher.stop();
-    } finally {
-      super.cancel();
-    }
+    this.gobblinYarnAppLauncher.stop();
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index fbd4294..690dc11 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -1027,7 +1027,7 @@
    * Build the {@link EventSubmitter} for this class.
    */
   private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> tags) {
-    return new EventSubmitter.Builder(this.runtimeMetricContext, "gobblin.runtime")
+    return new EventSubmitter.Builder(this.runtimeMetricContext, JobMetrics.NAMESPACE)
         .addMetadata(Tag.toMap(Tag.tagValuesToString(tags))).build();
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 7ebb77d..d8ce6f1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -502,7 +502,7 @@
     }
 
     EventSubmitter.Builder eventSubmitterBuilder = new EventSubmitter.Builder(JobMetrics.get(this.jobId, new JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
-        "gobblin.runtime");
+        JobMetrics.NAMESPACE);
     eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState, JobEvent.TASKS_SUBMITTED));
     eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, "tasksCount", Integer.toString(tasks.size()));
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 03146e1..bb8bac9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -42,6 +42,7 @@
  */
 @Slf4j
 public class JobMetrics extends GobblinMetrics {
+  public static final String NAMESPACE = "gobblin.runtime";
 
   public static final CreatorTag DEFAULT_CREATOR_TAG = new CreatorTag( "driver");
   protected final String jobName;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index f238ffc..182ce7d 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -44,6 +44,18 @@
   String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
 
   /**
+   * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs
+   * during testing
+   *
+   */
+  String GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = PREFIX + "job.metrics.suffix";
+  /**
+   * Default suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs
+   * is not empty because temporal is still in alpha stages, and should not accidentally affect a prod job
+   */
+  String DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = "-temporal";
+
+  /**
    * Number of worker processes to spin up per task runner
    * NOTE: If this size is too large, your container can OOM and halt execution unexpectedly. It's recommended not to touch
    * this parameter
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 14a4bac..f7040f7 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -17,20 +17,24 @@
 
 package org.apache.gobblin.temporal.ddm.launcher;
 
-import io.temporal.client.WorkflowOptions;
 import java.net.URI;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import lombok.extern.slf4j.Slf4j;
-
-import com.typesafe.config.ConfigFactory;
 import org.apache.hadoop.fs.Path;
 
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.client.WorkflowOptions;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
@@ -84,6 +88,12 @@
         wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
       }
       Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec)));
+
+      wuSpec.setTags(GobblinMetrics.getCustomTagsFromState(new State(jobProps)));
+      wuSpec.setMetricsSuffix(this.jobProps.getProperty(
+          GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
+          GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));
+
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
           .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 3b25971..5e3ba3d 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,15 +18,23 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
@@ -44,6 +52,8 @@
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitsDir;
   @NonNull private Tuning tuning = Tuning.DEFAULT;
+  @NonNull private List<Tag<?>> tags = new ArrayList<>();
+  @NonNull private String metricsSuffix = GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   @Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index d425bbc..99c4d05 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -29,6 +29,7 @@
 import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
 
 
 /** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
@@ -47,7 +48,7 @@
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() };
+        return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl(), new SubmitGTEActivityImpl() };
     }
 
     @Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index ba2ccf9..eccad9b 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -19,6 +19,7 @@
 
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
+
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 141f220..531a018 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,7 +16,18 @@
  */
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.jetbrains.annotations.NotNull;
 
 import com.typesafe.config.ConfigFactory;
 
@@ -25,17 +36,26 @@
 import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
 import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
-import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 
 
 @Slf4j
@@ -45,6 +65,32 @@
 
   @Override
   public int process(WUProcessingSpec workSpec) {
+    try {
+      EventSubmitterContext eventSubmitterContext = getEventSubmitterContext(workSpec);
+      TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
+      try (EventTimer timer = timerFactory.createJobTimer()) {
+        return performWork(workSpec);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @NotNull
+  private EventSubmitterContext getEventSubmitterContext(WUProcessingSpec workSpec)
+      throws IOException {
+    // NOTE: We are using the metrics tags from Job Props to create the metric context for the timer and NOT
+    // the deserialized jobState from HDFS that is created by the real distcp job. This is because the AZ runtime
+    // settings we want are for the job launcher that launched this Yarn job.
+    FileSystem fs = Help.loadFileSystemForce(workSpec);
+    JobState jobState = Help.loadJobStateUncached(workSpec, fs);
+    List<Tag<?>> tagsFromCurrentJob = workSpec.getTags();
+    String metricsSuffix = workSpec.getMetricsSuffix();
+    List<Tag<?>> tags = getTags(tagsFromCurrentJob, metricsSuffix, jobState);
+    return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+  }
+
+  private int performWork(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec);
     int workunitsProcessed = processingWorkflow.performWorkload(
@@ -85,4 +131,34 @@
 
     return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
   }
+
+  private List<Tag<?>> getTags(List<Tag<?>> tagsFromCurJob, String metricsSuffix, JobState jobStateFromHdfs) {
+    // Construct new tags list by combining subset of tags on HDFS job state and the rest of the fields from the current job
+    Map<String, Tag<?>> tagsMap = new HashMap<>();
+    Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+        TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+        TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+    // Step 1, Add tags from the AZ props using the original job (the one that launched this yarn app)
+    tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+    // Step 2. Add tags from the jobState (the original MR job on HDFS)
+    List<String> targetKeysToAddSuffix = Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+        .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+        .forEach(tag -> {
+          // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND FLOW_GROUP_FIELDS to prevent collisions when testing
+          String value = targetKeysToAddSuffix.contains(tag.getKey())
+              ? tag.getValue() + metricsSuffix
+              : String.valueOf(tag.getValue());
+          tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+        });
+
+    // Step 3: Overwrite any pre-existing metadata with name of the current caller
+    tagsMap.put(GobblinMetricsKeys.CLASS_META, new Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+    return new ArrayList<>(tagsMap.values());
+  }
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index c6478b9..f7d7255 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -23,10 +23,11 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
 
 import io.temporal.client.WorkflowClient;
 import io.temporal.serviceclient.WorkflowServiceStubs;
-import lombok.extern.slf4j.Slf4j;
+import io.temporal.workflow.Workflow;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -55,8 +56,9 @@
  * </p>
  */
 @Alpha
-@Slf4j
 public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
+  private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
+
   protected WorkflowServiceStubs workflowServiceStubs;
   protected WorkflowClient client;
   protected String queueName;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 0dcf19a..6bef7a6 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -25,7 +25,6 @@
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -33,6 +32,7 @@
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Promise;
 import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -109,7 +109,7 @@
     String thisWorkflowId = Workflow.getInfo().getWorkflowId();
     String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + childAddr;
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
-        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         .setWorkflowId(childWorkflowId)
         .build();
     return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
index 2bcf421..c653ce3 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
@@ -20,13 +20,19 @@
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
 @WorkflowInterface
 public interface GreetingWorkflow {
 
     /**
      * This is the method that is executed when the Workflow Execution is started. The Workflow
      * Execution completes when this method finishes execution.
+     *
+     * This method also shows an example of metrics emission using the {@link EventSubmitter} seen in
+     * non-Temporal Gobblin code.
      */
     @WorkflowMethod
-    String getGreeting(String name);
+    String getGreeting(String name, EventSubmitterContext eventSubmitterContext);
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 774b77e..9d26360 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -19,11 +19,19 @@
 
 import java.time.Duration;
 
+import org.slf4j.Logger;
+
 import io.temporal.activity.ActivityOptions;
 import io.temporal.workflow.Workflow;
 
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
 public class GreetingWorkflowImpl implements GreetingWorkflow {
 
+    private final Logger LOG = Workflow.getLogger(GreetingWorkflowImpl.class);
+
     /*
      * At least one of the following options needs to be defined:
      * - setStartToCloseTimeout
@@ -41,16 +49,19 @@
      *
      * The activity options that were defined above are passed in as a parameter.
      */
-    private final FormatActivity activity = Workflow.newActivityStub(FormatActivity.class, options);
+    private final FormatActivity formatActivity = Workflow.newActivityStub(FormatActivity.class, options);
 
     // This is the entry point to the Workflow.
     @Override
-    public String getGreeting(String name) {
-
+    public String getGreeting(String name, EventSubmitterContext eventSubmitterContext) {
         /**
-         * If there were other Activity methods they would be orchestrated here or from within other Activities.
-         * This is a blocking call that returns only after the activity has completed.
+         * Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation.
          */
-        return activity.composeGreeting(name);
+        TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
+        try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
+            LOG.info("Executing getGreeting");
+            timer.addMetadata("name", name);
+            return formatActivity.composeGreeting(name);
+        }
     }
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 33183b6..5d196fa 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -33,6 +33,7 @@
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
 /**
@@ -56,6 +57,9 @@
   public void submitJob(List<WorkUnit> workunits) {
     WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(queueName).build();
     GreetingWorkflow greetingWorkflow = this.client.newWorkflowStub(GreetingWorkflow.class, options);
-    greetingWorkflow.getGreeting("Gobblin");
+    EventSubmitterContext eventSubmitterContext = new EventSubmitterContext(this.eventSubmitter);
+
+    String greeting = greetingWorkflow.getGreeting("Gobblin", eventSubmitterContext);
+    log.info(greeting);
   }
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
index 274daee..d5d01cc 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
@@ -22,6 +22,7 @@
 import io.temporal.client.WorkflowClient;
 
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
 
 
 public class HelloWorldWorker extends AbstractTemporalWorker {
@@ -31,11 +32,16 @@
 
     @Override
     protected Class<?>[] getWorkflowImplClasses() {
-        return new Class[] { GreetingWorkflowImpl.class };
+        return new Class[] {
+            GreetingWorkflowImpl.class,
+        };
     }
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new FormatActivityImpl() };
+        return new Object[] {
+            new FormatActivityImpl(),
+            new SubmitGTEActivityImpl()
+        };
     }
 }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
new file mode 100644
index 0000000..f0db7e7
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+
+/**
+ * Wrapper for sending the core essence of an {@link EventSubmitter} over the wire (e.g. metadata tags, namespace)
+ * This is in lieu of sending the entire {@link EventSubmitter} object over the wire, which is not serializable without
+ * losing some information, such as the gauges
+ */
+@Getter
+public class EventSubmitterContext {
+  private final List<Tag<?>> tags;
+  private final String namespace;
+  private final Class callerClass;
+
+  @JsonCreator
+  private EventSubmitterContext(
+      @JsonProperty("tags") List<Tag<?>> tags,
+      @JsonProperty("namespace") String namespace,
+      @JsonProperty("callerClass") Class callerClass) {
+    this.tags = tags;
+    this.namespace = namespace;
+    this.callerClass = callerClass;
+  }
+
+  public EventSubmitterContext(List<Tag<?>> tags, String namespace) {
+    // Explicitly send class over the wire to avoid any classloader issues
+    this(tags, namespace, tags.stream()
+        .filter(tag -> tag.getKey().equals(CLASS_META))
+        .findAny()
+        .map(tag -> (String) tag.getValue())
+        .map(EventSubmitterContext::resolveClass)
+        .orElse(EventSubmitterContext.class));
+  }
+
+  public EventSubmitterContext(EventSubmitter eventSubmitter) {
+    this(eventSubmitter.getTags(), eventSubmitter.getNamespace());
+  }
+
+  public EventSubmitter create() {
+    MetricContext metricContext = Instrumented.getMetricContext(new State(), callerClass, tags);
+    return new EventSubmitter.Builder(metricContext, namespace).build();
+  }
+
+  private static Class resolveClass(String canonicalClassName) {
+    try {
+      return Class.forName(canonicalClassName);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
new file mode 100644
index 0000000..677baf4
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.Closeable;
+
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * <p> A timer that can be used to track the duration of an event. This event differs from the {@link TimingEvent} in that
+ * this class is not meant to be used outside of {@link io.temporal.workflow.Workflow} code. We cannot use {@link TimingEvent}
+ * because it is not serializable and cannot be passed to {@link io.temporal.workflow.Workflow} code due to the
+ * {@link EventSubmitter} field. It also relies on {@link System#currentTimeMillis()} which not compatible with {@link io.temporal.workflow.Workflow}
+ * since {@link System#currentTimeMillis()} is not deterministic.</p>
+ *
+ * <p> {@link EventSubmitter} is not easily serializable because the {@link MetricContext} field
+ * contains bi-directional relationships via the {@link org.apache.gobblin.metrics.InnerGauge}. Although it's possible
+ * to write a custom serializer for {@link EventSubmitter}, it creates a non-obvious sleight of hand where the EventSubmitter
+ * metadata will change when crossing {@link io.temporal.workflow.Workflow} or {@link io.temporal.activity.Activity} boundaries. </p>
+ *
+ * <p> It differs from {@link Closeable} because the close method does not throw {@link java.io.IOException}. {@link TimingEvent}
+ * does this but the issue is it does not implement an interface. Inheritance is not a good solution either because of the
+ * {@link EventSubmitter} member variable. </p>
+ */
+public interface EventTimer extends Closeable {
+  /**
+   * Add additional metadata that will be used for post-processing when the timer is stopped via {@link #stop()}
+   * @param key
+   * @param metadata
+   */
+  void addMetadata(String key, String metadata);
+
+  /**
+   * Stops the timer and execute any post-processing (e.g. event submission)
+   */
+  void stop();
+
+  default void close() {
+    stop();
+  }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
new file mode 100644
index 0000000..aa975d1
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+@ActivityInterface
+public interface SubmitGTEActivity {
+    @ActivityMethod
+    void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext eventSubmitterContext);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
new file mode 100644
index 0000000..63bce42
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import org.slf4j.Logger;
+
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+public class SubmitGTEActivityImpl implements SubmitGTEActivity {
+    private static Logger log = Workflow.getLogger(SubmitGTEActivityImpl.class);
+
+    @Override
+    public void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext eventSubmitterContext) {
+        eventSubmitterContext.create().submit(eventBuilder);
+    }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
new file mode 100644
index 0000000..0f68f11
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.workflow.Workflow;
+import lombok.RequiredArgsConstructor;
+
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * Boiler plate for tracking elapsed time of events that is compatible with {@link Workflow}
+ * by using activities to record time
+ *
+ * This class is very similar to {@link TimingEvent} but uses {@link Workflow} compatible APIs. It's possible to refactor
+ * this class to inherit the {@link TimingEvent} but extra care would be needed to remove the {@link EventSubmitter} field
+ * since that class is not serializable without losing some information
+ */
+@RequiredArgsConstructor
+public class TemporalEventTimer implements EventTimer {
+  private final SubmitGTEActivity trackingEventActivity;
+  private final GobblinEventBuilder eventBuilder;
+  private final EventSubmitterContext eventSubmitterContext;
+  private final Instant startTime;
+
+  @Override
+  public void stop() {
+    stop(getCurrentTime());
+  }
+
+  @Override
+  public void addMetadata(String key, String metadata) {
+    this.eventBuilder.addMetadata(key, metadata);
+  }
+
+
+  private void stop(Instant endTime) {
+    this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, TimingEvent.METADATA_TIMING_EVENT);
+    this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, Long.toString(this.startTime.toEpochMilli()));
+    this.eventBuilder.addMetadata(TimingEvent.METADATA_END_TIME, Long.toString(endTime.toEpochMilli()));
+    Duration duration = Duration.between(this.startTime, endTime);
+    this.eventBuilder.addMetadata(TimingEvent.METADATA_DURATION, Long.toString(duration.toMillis()));
+
+    trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
+  }
+
+  private static Instant getCurrentTime() {
+    return Instant.ofEpochMilli(Workflow.currentTimeMillis());
+  }
+
+  public static class Factory {
+    private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder().build();
+    private final SubmitGTEActivity submitGTEActivity;
+    private final EventSubmitterContext eventSubmitterContext;
+
+    public Factory(EventSubmitterContext eventSubmitterContext) {
+      this(eventSubmitterContext, DEFAULT_OPTS);
+    }
+
+    public Factory(EventSubmitterContext eventSubmitterContext, ActivityOptions opts) {
+      this.submitGTEActivity = Workflow.newActivityStub(SubmitGTEActivity.class, opts);
+      this.eventSubmitterContext = eventSubmitterContext;
+    }
+
+    public TemporalEventTimer create(String eventName, Instant startTime) {
+      GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName, eventSubmitterContext.getNamespace());
+      return new TemporalEventTimer(submitGTEActivity, eventBuilder, this.eventSubmitterContext, startTime);
+    }
+
+    public TemporalEventTimer create(String eventName) {
+      return create(eventName, getCurrentTime());
+    }
+
+    /**
+     * Utility for creating a timer that emits separate events at the start and end of a job. This imitates the behavior in
+     * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events that are compatible with the
+     * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to update GaaS flow statuses
+     *
+     * @return a timer that emits an event at the beginning of the job and a completion event ends at the end of the job
+     */
+    public TemporalEventTimer createJobTimer() {
+      TemporalEventTimer startTimer = create(TimingEvent.LauncherTimings.JOB_START);
+      startTimer.stop(Instant.EPOCH); // Emit start job event containing a stub end time
+      return create(TimingEvent.LauncherTimings.JOB_COMPLETE, startTimer.startTime);
+    }
+  }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 829b97e..ab69545 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -17,21 +17,6 @@
 
 package org.apache.gobblin.temporal.yarn;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -50,32 +35,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
-import lombok.AccessLevel;
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
-import org.apache.gobblin.metrics.MultiReporterException;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
-import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
-import org.apache.gobblin.yarn.GobblinYarnEventConstants;
-import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
-import org.apache.gobblin.yarn.YarnHelixUtils;
-import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
-import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
-import org.apache.gobblin.yarn.event.NewContainerRequest;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -107,6 +66,49 @@
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
 
 /**
  * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration,
@@ -683,18 +685,10 @@
       LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
       this.unusedHelixInstanceNames.add(completedInstanceName);
 
-      if (this.eventSubmitter.isPresent()) {
-        this.eventSubmitter.get()
-            .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
-      }
+      // NOTE: logic for handling container failure is removed because original implementation relies on the auto scaling manager
+      // to control the number of containers by polling helix for the current number of tasks
+      // Without that integration, that code requests too many containers when there are exceptions and overloads yarn
     }
-    Optional<Resource> newContainerResource = completedContainerInfo != null ?
-        Optional.of(completedContainerInfo.getContainer().getResource()) : Optional.absent();
-    LOGGER.info("Requesting a new container to replace {} to run Helix instance {} with helix tag {} and resource {}",
-        containerStatus.getContainerId(), completedInstanceName, helixTag, newContainerResource.orNull());
-    this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerInfo != null ?
-            Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
   }
 
   private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo,
diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
new file mode 100644
index 0000000..91c960e
--- /dev/null
+++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+public class EventSubmitterContextTest {
+  private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private final String NAMESPACE = "test-namespace";
+  @Test
+  public void testCreateEventSubmitter()
+      throws IOException {
+    List<Tag<?>> tags = Arrays.asList(new Tag<>("jobId", "stub"));
+    State state = new State();
+    state.setProp("someState", "stub");
+    MetricContext metricContext = Instrumented.getMetricContext(state, getClass(), tags);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, NAMESPACE).build();
+    EventSubmitterContext eventSubmitterContext = new EventSubmitterContext(eventSubmitter);
+    byte[] asBytes = OBJECT_MAPPER.writeValueAsBytes(eventSubmitterContext);
+
+    EventSubmitterContext deserEventMetadata = OBJECT_MAPPER.readValue(asBytes, EventSubmitterContext.class);
+    EventSubmitter deserEventSubmitter = deserEventMetadata.create();
+    Assert.assertTrue(deserEventSubmitter.getTags().contains(tags.get(0)));
+    Assert.assertEquals(deserEventSubmitter.getNamespace(), NAMESPACE);
+
+    Map<? extends String, String> tagMap = Tag.toMap(Tag.tagValuesToString(eventSubmitter.getTags()));
+    Assert.assertEquals(tagMap.get(CLASS_META), this.getClass().getName());
+    Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_ID_TAG_NAME));
+    Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_NAME_TAG_NAME));
+  }
+}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 391b160..d76405a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -104,11 +104,14 @@
 import org.apache.gobblin.cluster.GobblinHelixMessagingService;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.rest.JobExecutionInfoServer;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.util.AzkabanTags;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.EmailUtils;
@@ -121,6 +124,7 @@
 import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
 import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
 
+import static org.apache.gobblin.metrics.GobblinMetrics.METRICS_STATE_CUSTOM_TAGS;
 import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
 
 
@@ -1051,8 +1055,15 @@
       Schema schema = KafkaReporterUtils.getMetricReportSchema();
       String schemaId = registry.register(schema, KafkaReporterUtils.getMetricsTopic(properties).get());
       LOGGER.info("Adding schemaId {} for MetricReport to the config", schemaId);
-      config = config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
-          ConfigValueFactory.fromAnyRef(schemaId));
+      List<Tag<?>> tags = Lists.newArrayList();
+      tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+      GobblinMetrics.addCustomTagsToProperties(properties, tags);
+
+      config = config
+          .withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+              ConfigValueFactory.fromAnyRef(schemaId))
+          .withValue(METRICS_STATE_CUSTOM_TAGS,
+              ConfigValueFactory.fromAnyRef(properties.getProperty(METRICS_STATE_CUSTOM_TAGS)));
     }
     return config;
   }