Allow user to set group.id for Kafka ingestion task (#11147)
* allow user to set group.id for Kafka ingestion task
* fix test coverage by removing deprecated code and add doc
* fix typo
* Update docs/development/extensions-core/kafka-ingestion.md
Co-authored-by: frank chen <frankchen@apache.org>
Co-authored-by: frank chen <frankchen@apache.org>
diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md
index 0dd3167..e272bc1 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -40,6 +40,8 @@
> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka.
> Make sure offsets are sequential, since there is no offset gap check in Druid anymore.
+> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id.
+
## Tutorial
This page contains reference documentation for Apache Kafka-based ingestion.
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index 365be19..9ba3123 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -19,9 +19,6 @@
package org.apache.druid.indexing.kafka;
-import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.java.util.common.StringUtils;
-
import java.util.HashMap;
import java.util.Map;
@@ -35,7 +32,6 @@
{
final Map<String, Object> props = new HashMap<>();
props.put("metadata.max.age.ms", "10000");
- props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
return props;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index bd98510..623379a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -30,15 +30,9 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
{
@@ -84,44 +78,6 @@
return pollRetryMs;
}
- @Deprecated
- KafkaConsumer<byte[], byte[]> newConsumer()
- {
- ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-
- final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
- final Properties props = new Properties();
- KafkaRecordSupplier.addConsumerPropertiesFromConfig(
- props,
- configMapper,
- ioConfig.getConsumerProperties()
- );
- props.putIfAbsent("isolation.level", "read_committed");
- props.putAll(consumerConfigs);
-
- return new KafkaConsumer<>(props);
- }
- finally {
- Thread.currentThread().setContextClassLoader(currCtxCl);
- }
- }
-
- @Deprecated
- static void assignPartitions(
- final KafkaConsumer consumer,
- final String topic,
- final Set<Integer> partitions
- )
- {
- consumer.assign(
- new ArrayList<>(
- partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList())
- )
- );
- }
-
@Override
protected SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> createTaskRunner()
{
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 1397cdd..fe32ffe 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@@ -29,6 +30,7 @@
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -255,6 +257,7 @@
final Properties props = new Properties();
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
props.putIfAbsent("isolation.level", "read_committed");
+ props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.putAll(consumerConfigs);
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
diff --git a/website/.spelling b/website/.spelling
index bce45ee..1219f56 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -22,6 +22,7 @@
500MiB
64-bit
ACL
+ACLs
APIs
AvroStorage
AWS