Allow user to set group.id for Kafka ingestion task (#11147)

* allow user to set group.id for Kafka ingestion task

* fix test coverage by removing deprecated code and add doc

* fix typo

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: frank chen <frankchen@apache.org>

Co-authored-by: frank chen <frankchen@apache.org>
diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md
index 0dd3167..e272bc1 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -40,6 +40,8 @@
 > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka.
 > Make sure offsets are sequential, since there is no offset gap check in Druid anymore.
 
+> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id.
+
 ## Tutorial
 
 This page contains reference documentation for Apache Kafka-based ingestion.
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index 365be19..9ba3123 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -19,9 +19,6 @@
 
 package org.apache.druid.indexing.kafka;
 
-import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.java.util.common.StringUtils;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -35,7 +32,6 @@
   {
     final Map<String, Object> props = new HashMap<>();
     props.put("metadata.max.age.ms", "10000");
-    props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
     props.put("auto.offset.reset", "none");
     props.put("enable.auto.commit", "false");
     return props;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index bd98510..623379a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -30,15 +30,9 @@
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
 {
@@ -84,44 +78,6 @@
     return pollRetryMs;
   }
 
-  @Deprecated
-  KafkaConsumer<byte[], byte[]> newConsumer()
-  {
-    ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-
-      final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
-      final Properties props = new Properties();
-      KafkaRecordSupplier.addConsumerPropertiesFromConfig(
-          props,
-          configMapper,
-          ioConfig.getConsumerProperties()
-      );
-      props.putIfAbsent("isolation.level", "read_committed");
-      props.putAll(consumerConfigs);
-
-      return new KafkaConsumer<>(props);
-    }
-    finally {
-      Thread.currentThread().setContextClassLoader(currCtxCl);
-    }
-  }
-
-  @Deprecated
-  static void assignPartitions(
-      final KafkaConsumer consumer,
-      final String topic,
-      final Set<Integer> partitions
-  )
-  {
-    consumer.assign(
-        new ArrayList<>(
-            partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList())
-        )
-    );
-  }
-
   @Override
   protected SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> createTaskRunner()
   {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 1397cdd..fe32ffe 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.kafka.KafkaRecordEntity;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@@ -29,6 +30,7 @@
 import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.PasswordProvider;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -255,6 +257,7 @@
     final Properties props = new Properties();
     addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
     props.putIfAbsent("isolation.level", "read_committed");
+    props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
     props.putAll(consumerConfigs);
 
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
diff --git a/website/.spelling b/website/.spelling
index bce45ee..1219f56 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -22,6 +22,7 @@
 500MiB
 64-bit
 ACL
+ACLs
 APIs
 AvroStorage
 AWS