Dynamic auto scale Kinesis-Stream ingest tasks (#10985)

* ready to test

* revert misc.xml

* document kinesis md

* Update docs/development/extensions-core/kafka-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update kafka-ingestion.md

remove leading `

* Update kinesis-ingestion.md

add missing `

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md
index 6fa2262..dcebce0 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -147,7 +147,7 @@
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
-|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
+|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|
 
 ### Task Autoscaler Properties
 
diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md
index fcc4f64..8066d6b 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -146,6 +146,116 @@
 |`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
 |`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
 |`deaggregate`|Boolean|Whether to use the de-aggregate function of the KCL. See below for details.|no|
+|`autoScalerConfig`|Object|Defines auto scaling behavior for Kinesis ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|
+
+### Task Autoscaler Properties
+
+> Note that Task AutoScaler is currently designated as experimental.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Enable or disable the auto scaler. When false or or absent Druid disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) |
+| `taskCountMax` | Maximum number of Kinesis ingestion tasks. Must be greater than or equal to `taskCountMin`. If greater than `{numKinesisShards}`, the maximum number of reading tasks is `{numKinesisShards}` and `taskCountMax` is ignored.  | yes |
+| `taskCountMin` | Minimum number of Kinesis ingestion tasks. When you enable the auto scaler, Druid ignores the value of taskCount in `IOConfig` and uses`taskCountMin` for the initial number of tasks to launch.| yes |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
+| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
+
+### Lag Based AutoScaler Strategy Related Properties
+
+The Kinesis indexing service reports lag metrics measured in time milliseconds rather than message count which is used by Kafka.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `lagCollectionIntervalMillis` | Period of lag points collection.  | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
+| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
+| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds to delay after the supervisor starts before the first scale logic check. | no (default == 300000) |
+| `scaleActionPeriodMillis` | Frequency in milliseconds to check if a scale action is triggered | no (default == 60000) |
+| `scaleInStep` | Number of tasks to reduce at a time when scaling down | no (default == 1) |
+| `scaleOutStep` | Number of tasks to add at a time when scaling out | no (default == 2) |
+
+The following example demonstrates a supervisor spec with `lagBased` autoScaler enabled:
+```json
+{
+  "type": "kinesis",
+  "dataSchema": {
+    "dataSource": "metrics-kinesis",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": [],
+      "dimensionExclusions": [
+        "timestamp",
+        "value"
+      ]
+    },
+    "metricsSpec": [
+      {
+        "name": "count",
+        "type": "count"
+      },
+      {
+        "name": "value_sum",
+        "fieldName": "value",
+        "type": "doubleSum"
+      },
+      {
+        "name": "value_min",
+        "fieldName": "value",
+        "type": "doubleMin"
+      },
+      {
+        "name": "value_max",
+        "fieldName": "value",
+        "type": "doubleMax"
+      }
+    ],
+    "granularitySpec": {
+      "type": "uniform",
+      "segmentGranularity": "HOUR",
+      "queryGranularity": "NONE"
+    }
+  },
+  "ioConfig": {
+    "stream": "metrics",
+    "autoScalerConfig": {
+      "enableTaskAutoScaler": true,
+      "taskCountMax": 6,
+      "taskCountMin": 2,
+      "minTriggerScaleActionFrequencyMillis": 600000,
+      "autoScalerStrategy": "lagBased",
+      "lagCollectionIntervalMillis": 30000,
+      "lagCollectionRangeMillis": 600000,
+      "scaleOutThreshold": 600000,
+      "triggerScaleOutFractionThreshold": 0.3,
+      "scaleInThreshold": 100000,
+      "triggerScaleInFractionThreshold": 0.9,
+      "scaleActionStartDelayMillis": 300000,
+      "scaleActionPeriodMillis": 60000,
+      "scaleInStep": 1,
+      "scaleOutStep": 2
+    },
+    "inputFormat": {
+      "type": "json"
+    },
+    "endpoint": "kinesis.us-east-1.amazonaws.com",
+    "taskCount": 1,
+    "replicas": 1,
+    "taskDuration": "PT1H",
+    "recordsPerFetch": 2000,
+    "fetchDelayMillis": 1000
+  },
+  "tuningConfig": {
+    "type": "kinesis",
+    "maxRowsPerSegment": 5000000
+  }
+}
+```
 
 #### Specifying data format
 
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 92defd2..e33f458 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -379,11 +379,19 @@
     return true;
   }
 
-  // not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
+  // Unlike the Kafka Indexing Service,
+  // Kinesis reports lag metrics measured in time difference in milliseconds between the current sequence number and latest sequence number,
+  // rather than message count.
   @Override
   public LagStats computeLagStats()
   {
-    throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
+    Map<String, Long> partitionTimeLags = getPartitionTimeLag();
+
+    if (partitionTimeLags == null) {
+      return new LagStats(0, 0, 0);
+    }
+
+    return computeLags(partitionTimeLags);
   }
 
   @Override
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index b43cece..41ae876 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -93,12 +93,6 @@
         lateMessageRejectionStartDateTime
     );
 
-    // for now dynamic Allocation Tasks is not supported here
-    // throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
-    if (autoScalerConfig != null) {
-      throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
-    }
-
     this.endpoint = endpoint != null
                     ? endpoint
                     : (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
@@ -157,6 +151,7 @@
            ", endpoint='" + endpoint + '\'' +
            ", replicas=" + getReplicas() +
            ", taskCount=" + getTaskCount() +
+           ", autoScalerConfig=" + getAutoscalerConfig() +
            ", taskDuration=" + getTaskDuration() +
            ", startDelay=" + getStartDelay() +
            ", period=" + getPeriod() +
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 2a806c8..d1ba49b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -58,6 +58,7 @@
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
@@ -67,6 +68,7 @@
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
 import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -284,6 +286,158 @@
   }
 
   @Test
+  public void testNoInitialStateWithAutoScaleOut() throws Exception
+  {
+    HashMap<String, Object> autoScalerConfigMap = new HashMap<>();
+    autoScalerConfigMap.put("enableTaskAutoScaler", true);
+    autoScalerConfigMap.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfigMap.put("lagCollectionRangeMillis", 500);
+    autoScalerConfigMap.put("scaleOutThreshold", 0);
+    autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.0);
+    autoScalerConfigMap.put("scaleInThreshold", 1000000);
+    autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfigMap.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfigMap.put("scaleActionPeriodMillis", 100);
+    autoScalerConfigMap.put("taskCountMax", 2);
+    autoScalerConfigMap.put("taskCountMin", 1);
+    autoScalerConfigMap.put("scaleInStep", 1);
+    autoScalerConfigMap.put("scaleOutStep", 2);
+    autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+    AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
+    supervisor = getTestableSupervisor(
+            1,
+            1,
+            true,
+            "PT1H",
+            null,
+            null,
+            false,
+            null,
+            null,
+            autoScalerConfig
+            );
+    KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec();
+    SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor);
+
+    supervisorRecordSupplier.assign(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
+            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
+            .anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getAssignment())
+            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
+            .anyTimes();
+    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+    EasyMock.expectLastCall().anyTimes();
+
+    Capture<KinesisIndexTask> captured = Capture.newInstance();
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+            new KinesisDataSourceMetadata(
+                    null
+            )
+    ).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountBeforeScale);
+    autoscaler.start();
+
+    supervisor.runInternal();
+    verifyAll();
+    Thread.sleep(1 * 1000);
+    int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(2, taskCountAfterScale);
+  }
+
+  @Test
+  public void testNoInitialStateWithAutoScaleIn() throws Exception
+  {
+    HashMap<String, Object> autoScalerConfigMap = new HashMap<>();
+    autoScalerConfigMap.put("enableTaskAutoScaler", true);
+    autoScalerConfigMap.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfigMap.put("lagCollectionRangeMillis", 500);
+    autoScalerConfigMap.put("scaleOutThreshold", 1000000);
+    autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.8);
+    autoScalerConfigMap.put("scaleInThreshold", 0);
+    autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.0);
+    autoScalerConfigMap.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfigMap.put("scaleActionPeriodMillis", 100);
+    autoScalerConfigMap.put("taskCountMax", 2);
+    autoScalerConfigMap.put("taskCountMin", 1);
+    autoScalerConfigMap.put("scaleInStep", 1);
+    autoScalerConfigMap.put("scaleOutStep", 2);
+    autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+    AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
+    supervisor = getTestableSupervisor(
+            1,
+            2,
+            true,
+            "PT1H",
+            null,
+            null,
+            false,
+            null,
+            null,
+            autoScalerConfig
+    );
+
+    KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec();
+    SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor);
+
+    supervisorRecordSupplier.assign(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
+            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
+            .anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getAssignment())
+            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
+            .anyTimes();
+    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+    EasyMock.expectLastCall().anyTimes();
+
+    Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    EasyMock
+            .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+            .andReturn(new KinesisDataSourceMetadata(null))
+            .anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
+    replayAll();
+
+    int taskCountInit = supervisor.getIoConfig().getTaskCount();
+    // when enable autoScaler the init taskCount will be equal to taskCountMin
+    Assert.assertEquals(1, taskCountInit);
+    supervisor.getIoConfig().setTaskCount(2);
+
+    supervisor.start();
+    int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(2, taskCountBeforeScale);
+    autoscaler.start();
+
+    supervisor.runInternal();
+    verifyAll();
+    Thread.sleep(1 * 1000);
+    int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountAfterScale);
+  }
+
+  @Test
   public void testRecordSupplier()
   {
     KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@@ -347,69 +501,64 @@
   }
 
   @Test
-  public void testKinesisIOConfig()
+  public void testKinesisIOConfigInitAndAutoscalerConfigCreation()
   {
-    Exception e = null;
-    try {
-      KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
-              STREAM,
-              INPUT_FORMAT,
-              "awsEndpoint",
-              null,
-              1,
-              1,
-              new Period("PT30M"),
-              new Period("P1D"),
-              new Period("PT30S"),
-              false,
-              new Period("PT30M"),
-              null,
-              null,
-              null,
-              100,
-              1000,
-              null,
-              null,
-              null,
-              false
-      );
-      AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig();
-      Assert.assertNull(autoScalerConfig);
-    }
-    catch (Exception ex) {
-      e = ex;
-    }
-    Assert.assertNull(e);
+    // create KinesisSupervisorIOConfig with autoScalerConfig null
+    KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig(
+            STREAM,
+            INPUT_FORMAT,
+            "awsEndpoint",
+            null,
+            1,
+            1,
+            new Period("PT30M"),
+            new Period("P1D"),
+            new Period("PT30S"),
+            false,
+            new Period("PT30M"),
+            null,
+            null,
+            null,
+            100,
+            1000,
+            null,
+            null,
+            null,
+            false
+    );
 
-    try {
-      KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
-              STREAM,
-              INPUT_FORMAT,
-              "awsEndpoint",
-              null,
-              1,
-              1,
-              new Period("PT30M"),
-              new Period("P1D"),
-              new Period("PT30S"),
-              false,
-              new Period("PT30M"),
-              null,
-              null,
-              null,
-              100,
-              1000,
-              null,
-              null,
-              OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
-              false
-      );
-    }
-    catch (Exception ex) {
-      e = ex;
-    }
-    Assert.assertNotNull(e);
-    Assert.assertTrue(e instanceof UnsupportedOperationException);
+    AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig();
+    Assert.assertNull(autoscalerConfigNull);
+
+    // create KinesisSupervisorIOConfig with autoScalerConfig Empty
+    KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig(
+            STREAM,
+            INPUT_FORMAT,
+            "awsEndpoint",
+            null,
+            1,
+            1,
+            new Period("PT30M"),
+            new Period("P1D"),
+            new Period("PT30S"),
+            false,
+            new Period("PT30M"),
+            null,
+            null,
+            null,
+            100,
+            1000,
+            null,
+            null,
+             OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
+            false
+    );
+
+    AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig();
+    Assert.assertNotNull(autoscalerConfig);
+    Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig);
+    Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler());
+    Assert.assertTrue(autoscalerConfig.toString().contains("autoScalerConfig"));
   }
 
   @Test
@@ -4895,6 +5044,7 @@
         earlyMessageRejectionPeriod,
         false,
         null,
+        null,
         null
     );
   }
@@ -4908,7 +5058,8 @@
       Period earlyMessageRejectionPeriod,
       boolean suspended,
       Integer recordsPerFetch,
-      Integer fetchDelayMillis
+      Integer fetchDelayMillis,
+      AutoScalerConfig autoScalerConfig
   )
   {
     KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@@ -4930,7 +5081,7 @@
         fetchDelayMillis,
         null,
         null,
-        null,
+         autoScalerConfig,
         false
     );
 
@@ -5303,6 +5454,8 @@
 
   private class TestableKinesisSupervisor extends KinesisSupervisor
   {
+    private KinesisSupervisorSpec spec;
+
     TestableKinesisSupervisor(
         TaskStorage taskStorage,
         TaskMaster taskMaster,
@@ -5323,6 +5476,12 @@
           rowIngestionMetersFactory,
           null
       );
+      this.spec = spec;
+    }
+
+    protected KinesisSupervisorSpec getKinesisSupervisorSpec()
+    {
+      return spec;
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 66d1139..2074900 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -68,7 +68,6 @@
 
   /**
    * Computes maxLag, totalLag and avgLag
-   * Only supports Kafka ingestion so far.
    */
   LagStats computeLagStats();