Emit job metrics at RM level (#4143)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d33a99c..b46ab75 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -184,6 +184,7 @@
   public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
   public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
   public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
+  public static final String RM_HOST_KEY = "hadoop.resource.manager.rpc";
 
   /**
    * Common topology configuration properties.
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 879410c..3d68a5b 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -92,6 +92,7 @@
     public static final String JOB_TAG_FIELD = "jobTag";
     public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
     public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+    public static final String RM_HOST_FIELD = "rmHost";
     public static final String LOW_WATERMARK_FIELD = "lowWatermark";
     public static final String HIGH_WATERMARK_FIELD = "highWatermark";
     public static final String PROCESSED_COUNT_FIELD = "processedCount";
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 7809e78..e3c729a 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -30,6 +30,8 @@
   public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = "dataQualityNonEvaluatedFileCount";
   public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
   public static final String DATA_QUALITY_BYTES_WRITTEN = "dataQualityBytesWritten";
+  // RM metric names
+  public static final String RM_JOB_OBSERVED_COUNT = "rmJobObservedCount";
 
   // Flow Compilation Meters and Timer
   public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index c624683..9b35921 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -32,6 +32,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
@@ -53,6 +55,9 @@
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.initializer.Initializer;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -80,6 +85,8 @@
 import org.apache.gobblin.writer.initializer.WriterInitializer;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
+import static org.apache.gobblin.runtime.JobState.GAAS_OBSERVABILITY_METRICS_GROUPNAME;
+
 
 
 @Slf4j
@@ -142,6 +149,8 @@
     // TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)!
     JobState jobState = new JobState(jobProps);
     log.info("Created jobState: {}", jobState.toJsonString(true));
+    // emit jobs observed at RM level
+    emitMetrics(jobState);
 
     int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
     heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
@@ -354,4 +363,37 @@
   public static int getConfiguredNumSizeSummaryQuantiles(State state) {
     return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
   }
+
+  /**
+   * Emit metrics to indicate jobs observed at RM level
+   * @param jobState job state
+   */
+  private void emitMetrics(JobState jobState) {
+    try {
+      OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics == null) {
+        log.warn("OpenTelemetry metrics instance is null, skipping metrics emission");
+        return;
+      }
+
+      Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+      Attributes tags = getEventAttributes(jobState);
+      log.info("Emitting metrics for job: {}", jobState.getJobName());
+      String jobMetricDescription = "Number of Jobs observed on RM";
+      String jobMetricName = ServiceMetricNames.RM_JOB_OBSERVED_COUNT;
+      meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1, tags);
+    } catch (Exception e) {
+      log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
+    }
+  }
+
+  private Attributes getEventAttributes(JobState jobState) {
+    return Attributes.builder()
+        .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "NA"))
+        .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "NA"))
+        .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "NA"))
+        .put(TimingEvent.FlowEventConstants.FLOW_FABRIC, jobState.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC, "NA"))
+        .put(TimingEvent.FlowEventConstants.RM_HOST_FIELD, jobState.getProp(ConfigurationKeys.RM_HOST_KEY, "NA"))
+        .build();
+  }
 }