[HUDI-1233] Deltastreamer Kafka consumption delay reporting indicators (#2074)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index ac48805..0f4a64c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -35,12 +35,15 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
+import org.apache.hudi.utilities.sources.AvroKafkaSource;
+import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.hudi.utilities.transform.ChainedTransformer;
@@ -93,8 +96,16 @@
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
- SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
+ SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException {
+
try {
+ if (JsonKafkaSource.class.getName().equals(sourceClass)
+ || AvroKafkaSource.class.getName().equals(sourceClass)) {
+ return (Source) ReflectionUtils.loadClass(sourceClass,
+ new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg,
+ jssc, sparkSession, schemaProvider, metrics);
+ }
+
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
jssc, sparkSession, schemaProvider);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 2b0de93..afe6862 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -79,7 +79,6 @@
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
-
import java.util.stream.Collectors;
import scala.collection.JavaConversions;
@@ -171,6 +170,8 @@
*/
private transient HoodieWriteClient writeClient;
+ private transient HoodieDeltaStreamerMetrics metrics;
+
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
@@ -190,8 +191,10 @@
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
+ this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
+
this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
+ UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics));
this.conf = conf;
}
@@ -226,7 +229,6 @@
*/
public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
- HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
Timer.Context overallTimerContext = metrics.getOverallTimerContext();
// Refresh Timeline
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index 8e3d253..0567137 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -89,6 +89,12 @@
}
}
+ public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) {
+ if (config.isMetricsOn()) {
+ Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
+ }
+ }
+
public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 66a38e2..256516b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -20,6 +20,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -45,9 +46,12 @@
private final KafkaOffsetGen offsetGen;
+ private final HoodieDeltaStreamerMetrics metrics;
+
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
+ this.metrics = metrics;
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
offsetGen = new KafkaOffsetGen(props);
@@ -55,7 +59,7 @@
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
- OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
+ OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 833c6c6..cedaba4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -20,6 +20,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -43,9 +44,12 @@
private final KafkaOffsetGen offsetGen;
+ private final HoodieDeltaStreamerMetrics metrics;
+
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(properties, sparkContext, sparkSession, schemaProvider);
+ this.metrics = metrics;
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
offsetGen = new KafkaOffsetGen(properties);
@@ -53,7 +57,7 @@
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
- OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
+ OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index e958f85..fc7ba79 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -24,6 +24,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -166,7 +167,7 @@
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
}
- public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
+ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
// Obtain current metadata for the topic
Map<TopicPartition, Long> fromOffsets;
@@ -183,6 +184,7 @@
// Determine the offset ranges to read from
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) {
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
+ metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
@@ -233,6 +235,18 @@
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
+ private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
+ Long delayCount = 0L;
+ Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+ Map<TopicPartition, Long> lastOffsets = consumer.endOffsets(topicPartitions);
+
+ for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) {
+ Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
+ delayCount += entry.getValue() - offect > 0 ? entry.getValue() - offect : 0L;
+ }
+ return delayCount;
+ }
+
/**
* Check if topic exists.
* @param consumer kafka consumer
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index ea77e26..e8cb2a6 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -22,6 +22,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -46,6 +47,7 @@
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
/**
* Tests against {@link AvroKafkaSource}.
@@ -56,6 +58,7 @@
private FilebasedSchemaProvider schemaProvider;
private KafkaTestUtils testUtils;
+ private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
@BeforeAll
public static void initClass() throws Exception {
@@ -101,7 +104,7 @@
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
- Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
@@ -149,11 +152,11 @@
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties earliestProps = createPropsForJsonSource(null, "earliest");
- Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider);
+ Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);
TypedProperties latestProps = createPropsForJsonSource(null, "latest");
- Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider);
+ Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);
// 1. Extract with a none data kafka checkpoint
@@ -181,7 +184,7 @@
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
- Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
Config.maxEventsFromKafkaSource = 500;
@@ -210,7 +213,7 @@
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
- Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
Config.maxEventsFromKafkaSource = 500;
@@ -242,7 +245,7 @@
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(500L, "earliest");
- Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit