[HUDI-1233] Deltastreamer Kafka consumption delay reporting indicators (#2074)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index ac48805..0f4a64c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -35,12 +35,15 @@
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
+import org.apache.hudi.utilities.sources.AvroKafkaSource;
+import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
@@ -93,8 +96,16 @@
   private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
 
   public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
-                                    SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
+                                    SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException {
+
     try {
+      if (JsonKafkaSource.class.getName().equals(sourceClass)
+              || AvroKafkaSource.class.getName().equals(sourceClass)) {
+        return (Source) ReflectionUtils.loadClass(sourceClass,
+                new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg,
+                jssc, sparkSession, schemaProvider, metrics);
+      }
+
       return (Source) ReflectionUtils.loadClass(sourceClass,
           new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
           jssc, sparkSession, schemaProvider);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 2b0de93..afe6862 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -79,7 +79,6 @@
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
-
 import java.util.stream.Collectors;
 
 import scala.collection.JavaConversions;
@@ -171,6 +170,8 @@
    */
   private transient HoodieWriteClient writeClient;
 
+  private transient HoodieDeltaStreamerMetrics metrics;
+
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
                    TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf,
                    Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
@@ -190,8 +191,10 @@
     this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
     this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
 
+    this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
+
     this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
+        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics));
     this.conf = conf;
   }
 
@@ -226,7 +229,6 @@
    */
   public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
     Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
-    HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
     Timer.Context overallTimerContext = metrics.getOverallTimerContext();
 
     // Refresh Timeline
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index 8e3d253..0567137 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -89,6 +89,12 @@
     }
   }
 
+  public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) {
+    if (config.isMetricsOn()) {
+      Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
+    }
+  }
+
   public long getDurationInMs(long ctxDuration) {
     return ctxDuration / 1000000;
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 66a38e2..256516b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -20,6 +20,7 @@
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -45,9 +46,12 @@
 
   private final KafkaOffsetGen offsetGen;
 
+  private final HoodieDeltaStreamerMetrics metrics;
+
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
+      SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
+    this.metrics = metrics;
     props.put("key.deserializer", StringDeserializer.class);
     props.put("value.deserializer", KafkaAvroDeserializer.class);
     offsetGen = new KafkaOffsetGen(props);
@@ -55,7 +59,7 @@
 
   @Override
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
-    OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
+    OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
     LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 833c6c6..cedaba4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -20,6 +20,7 @@
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -43,9 +44,12 @@
 
   private final KafkaOffsetGen offsetGen;
 
+  private final HoodieDeltaStreamerMetrics metrics;
+
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
+                         SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
     super(properties, sparkContext, sparkSession, schemaProvider);
+    this.metrics = metrics;
     properties.put("key.deserializer", StringDeserializer.class);
     properties.put("value.deserializer", StringDeserializer.class);
     offsetGen = new KafkaOffsetGen(properties);
@@ -53,7 +57,7 @@
 
   @Override
   protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
-    OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
+    OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
     LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index e958f85..fc7ba79 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -24,6 +24,7 @@
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -166,7 +167,7 @@
     topicName = props.getString(Config.KAFKA_TOPIC_NAME);
   }
 
-  public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
+  public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
 
     // Obtain current metadata for the topic
     Map<TopicPartition, Long> fromOffsets;
@@ -183,6 +184,7 @@
       // Determine the offset ranges to read from
       if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) {
         fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
+        metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
       } else {
         KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
                 .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
@@ -233,6 +235,18 @@
     return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
   }
 
+  private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
+    Long delayCount = 0L;
+    Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+    Map<TopicPartition, Long> lastOffsets = consumer.endOffsets(topicPartitions);
+
+    for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) {
+      Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
+      delayCount += entry.getValue() - offect > 0 ? entry.getValue() - offect : 0L;
+    }
+    return delayCount;
+  }
+
   /**
    * Check if topic exists.
    * @param consumer kafka consumer
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index ea77e26..e8cb2a6 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -22,6 +22,7 @@
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -46,6 +47,7 @@
 import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests against {@link AvroKafkaSource}.
@@ -56,6 +58,7 @@
 
   private FilebasedSchemaProvider schemaProvider;
   private KafkaTestUtils testUtils;
+  private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
 
   @BeforeAll
   public static void initClass() throws Exception {
@@ -101,7 +104,7 @@
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     TypedProperties props = createPropsForJsonSource(null, "earliest");
 
-    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
     // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
@@ -149,11 +152,11 @@
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
 
     TypedProperties earliestProps = createPropsForJsonSource(null, "earliest");
-    Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider);
+    Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics);
     SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);
 
     TypedProperties latestProps = createPropsForJsonSource(null, "latest");
-    Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider);
+    Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics);
     SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);
 
     // 1. Extract with a none data kafka checkpoint
@@ -181,7 +184,7 @@
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
 
-    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
     Config.maxEventsFromKafkaSource = 500;
 
@@ -210,7 +213,7 @@
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
 
-    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
     Config.maxEventsFromKafkaSource = 500;
 
@@ -242,7 +245,7 @@
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     TypedProperties props = createPropsForJsonSource(500L, "earliest");
 
-    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
     // 1. Extract without any checkpoint => get all the data, respecting sourceLimit