[GOBBLIN-1400] Added comments to schemaCheck methods
Closes #3237 from vikrambohra/addComments
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 9e14539..2a0bc2b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -84,6 +84,14 @@
this.schemaCheckEnabled = ConfigUtils.getBoolean(config, CONFIG_ENABLE_SCHEMA_CHECK, ENABLE_SCHEMA_CHECK_DEFAULT);
}
+ /**
+ * Filter topics based on whitelist and blacklist patterns
+ * and if {@link #schemaCheckEnabled}, also filter on whether schema is present in schema registry
+ * @param blacklist - List of regex patterns that need to be blacklisted
+ * @param whitelist - List of regex patterns that need to be whitelisted
+ *
+ * @return
+ */
@Override
public List<KafkaTopic> getFilteredTopics(final List<Pattern> blacklist, final List<Pattern> whitelist) {
return Lists.newArrayList(Iterables.filter(getTopics(), new Predicate<KafkaTopic>() {
@@ -101,6 +109,11 @@
return this.schemaRegistry.isPresent();
}
+ /**
+ * accept topic if {@link #schemaCheckEnabled} and schema registry is configured
+ * @param topic
+ * @return
+ */
private boolean isSchemaPresent(String topic) {
if(this.schemaCheckEnabled && isSchemaRegistryConfigured()) {
try {