Emit job metrics at RM level (#4143)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d33a99c..b46ab75 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -184,6 +184,7 @@
public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
+ public static final String RM_HOST_KEY = "hadoop.resource.manager.rpc";
/**
* Common topology configuration properties.
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 879410c..3d68a5b 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -92,6 +92,7 @@
public static final String JOB_TAG_FIELD = "jobTag";
public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+ public static final String RM_HOST_FIELD = "rmHost";
public static final String LOW_WATERMARK_FIELD = "lowWatermark";
public static final String HIGH_WATERMARK_FIELD = "highWatermark";
public static final String PROCESSED_COUNT_FIELD = "processedCount";
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 7809e78..e3c729a 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -30,6 +30,8 @@
public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = "dataQualityNonEvaluatedFileCount";
public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
public static final String DATA_QUALITY_BYTES_WRITTEN = "dataQualityBytesWritten";
+ // RM metric names
+ public static final String RM_JOB_OBSERVED_COUNT = "rmJobObservedCount";
// Flow Compilation Meters and Timer
public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index c624683..9b35921 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -32,6 +32,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -53,6 +55,9 @@
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.initializer.Initializer;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -80,6 +85,8 @@
import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+import static org.apache.gobblin.runtime.JobState.GAAS_OBSERVABILITY_METRICS_GROUPNAME;
+
@Slf4j
@@ -142,6 +149,8 @@
// TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
log.info("Created jobState: {}", jobState.toJsonString(true));
+ // emit jobs observed at RM level
+ emitMetrics(jobState);
int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
@@ -354,4 +363,37 @@
public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
+
+ /**
+ * Emit metrics to indicate jobs observed at RM level
+ * @param jobState job state
+ */
+ private void emitMetrics(JobState jobState) {
+ try {
+ OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics == null) {
+ log.warn("OpenTelemetry metrics instance is null, skipping metrics emission");
+ return;
+ }
+
+ Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+ Attributes tags = getEventAttributes(jobState);
+ log.info("Emitting metrics for job: {}", jobState.getJobName());
+ String jobMetricDescription = "Number of Jobs observed on RM";
+ String jobMetricName = ServiceMetricNames.RM_JOB_OBSERVED_COUNT;
+ meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1, tags);
+ } catch (Exception e) {
+ log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
+ }
+ }
+
+ private Attributes getEventAttributes(JobState jobState) {
+ return Attributes.builder()
+ .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "NA"))
+ .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "NA"))
+ .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "NA"))
+ .put(TimingEvent.FlowEventConstants.FLOW_FABRIC, jobState.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC, "NA"))
+ .put(TimingEvent.FlowEventConstants.RM_HOST_FIELD, jobState.getProp(ConfigurationKeys.RM_HOST_KEY, "NA"))
+ .build();
+ }
}