Dynamic auto scale Kinesis-Stream ingest tasks (#10985)
* ready to test
* revert misc.xml
* document kinesis md
* Update docs/development/extensions-core/kafka-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update docs/development/extensions-core/kinesis-ingestion.md
* Update kafka-ingestion.md
remove leading `
* Update kinesis-ingestion.md
add missing `
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md
index 6fa2262..dcebce0 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -147,7 +147,7 @@
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
-|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
+|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|
### Task Autoscaler Properties
diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md
index fcc4f64..8066d6b 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -146,6 +146,116 @@
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
|`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
|`deaggregate`|Boolean|Whether to use the de-aggregate function of the KCL. See below for details.|no|
+|`autoScalerConfig`|Object|Defines auto scaling behavior for Kinesis ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|
+
+### Task Autoscaler Properties
+
+> Note that Task AutoScaler is currently designated as experimental.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Enable or disable the auto scaler. When false or or absent Druid disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) |
+| `taskCountMax` | Maximum number of Kinesis ingestion tasks. Must be greater than or equal to `taskCountMin`. If greater than `{numKinesisShards}`, the maximum number of reading tasks is `{numKinesisShards}` and `taskCountMax` is ignored. | yes |
+| `taskCountMin` | Minimum number of Kinesis ingestion tasks. When you enable the auto scaler, Druid ignores the value of taskCount in `IOConfig` and uses`taskCountMin` for the initial number of tasks to launch.| yes |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
+| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
+
+### Lag Based AutoScaler Strategy Related Properties
+
+The Kinesis indexing service reports lag metrics measured in time milliseconds rather than message count which is used by Kafka.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
+| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
+| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds to delay after the supervisor starts before the first scale logic check. | no (default == 300000) |
+| `scaleActionPeriodMillis` | Frequency in milliseconds to check if a scale action is triggered | no (default == 60000) |
+| `scaleInStep` | Number of tasks to reduce at a time when scaling down | no (default == 1) |
+| `scaleOutStep` | Number of tasks to add at a time when scaling out | no (default == 2) |
+
+The following example demonstrates a supervisor spec with `lagBased` autoScaler enabled:
+```json
+{
+ "type": "kinesis",
+ "dataSchema": {
+ "dataSource": "metrics-kinesis",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [],
+ "dimensionExclusions": [
+ "timestamp",
+ "value"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "value_sum",
+ "fieldName": "value",
+ "type": "doubleSum"
+ },
+ {
+ "name": "value_min",
+ "fieldName": "value",
+ "type": "doubleMin"
+ },
+ {
+ "name": "value_max",
+ "fieldName": "value",
+ "type": "doubleMax"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "NONE"
+ }
+ },
+ "ioConfig": {
+ "stream": "metrics",
+ "autoScalerConfig": {
+ "enableTaskAutoScaler": true,
+ "taskCountMax": 6,
+ "taskCountMin": 2,
+ "minTriggerScaleActionFrequencyMillis": 600000,
+ "autoScalerStrategy": "lagBased",
+ "lagCollectionIntervalMillis": 30000,
+ "lagCollectionRangeMillis": 600000,
+ "scaleOutThreshold": 600000,
+ "triggerScaleOutFractionThreshold": 0.3,
+ "scaleInThreshold": 100000,
+ "triggerScaleInFractionThreshold": 0.9,
+ "scaleActionStartDelayMillis": 300000,
+ "scaleActionPeriodMillis": 60000,
+ "scaleInStep": 1,
+ "scaleOutStep": 2
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ "endpoint": "kinesis.us-east-1.amazonaws.com",
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT1H",
+ "recordsPerFetch": 2000,
+ "fetchDelayMillis": 1000
+ },
+ "tuningConfig": {
+ "type": "kinesis",
+ "maxRowsPerSegment": 5000000
+ }
+}
+```
#### Specifying data format
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 92defd2..e33f458 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -379,11 +379,19 @@
return true;
}
- // not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
+ // Unlike the Kafka Indexing Service,
+ // Kinesis reports lag metrics measured in time difference in milliseconds between the current sequence number and latest sequence number,
+ // rather than message count.
@Override
public LagStats computeLagStats()
{
- throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
+ Map<String, Long> partitionTimeLags = getPartitionTimeLag();
+
+ if (partitionTimeLags == null) {
+ return new LagStats(0, 0, 0);
+ }
+
+ return computeLags(partitionTimeLags);
}
@Override
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index b43cece..41ae876 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -93,12 +93,6 @@
lateMessageRejectionStartDateTime
);
- // for now dynamic Allocation Tasks is not supported here
- // throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
- if (autoScalerConfig != null) {
- throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
- }
-
this.endpoint = endpoint != null
? endpoint
: (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
@@ -157,6 +151,7 @@
", endpoint='" + endpoint + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
+ ", autoScalerConfig=" + getAutoscalerConfig() +
", taskDuration=" + getTaskDuration() +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 2a806c8..d1ba49b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -58,6 +58,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
@@ -67,6 +68,7 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -284,6 +286,158 @@
}
@Test
+ public void testNoInitialStateWithAutoScaleOut() throws Exception
+ {
+ HashMap<String, Object> autoScalerConfigMap = new HashMap<>();
+ autoScalerConfigMap.put("enableTaskAutoScaler", true);
+ autoScalerConfigMap.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfigMap.put("lagCollectionRangeMillis", 500);
+ autoScalerConfigMap.put("scaleOutThreshold", 0);
+ autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.0);
+ autoScalerConfigMap.put("scaleInThreshold", 1000000);
+ autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfigMap.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfigMap.put("scaleActionPeriodMillis", 100);
+ autoScalerConfigMap.put("taskCountMax", 2);
+ autoScalerConfigMap.put("taskCountMin", 1);
+ autoScalerConfigMap.put("scaleInStep", 1);
+ autoScalerConfigMap.put("scaleOutStep", 2);
+ autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+ AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
+ supervisor = getTestableSupervisor(
+ 1,
+ 1,
+ true,
+ "PT1H",
+ null,
+ null,
+ false,
+ null,
+ null,
+ autoScalerConfig
+ );
+ KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec();
+ SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor);
+
+ supervisorRecordSupplier.assign(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
+ .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
+ .anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getAssignment())
+ .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
+ .anyTimes();
+ supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+ supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+ EasyMock.expectLastCall().anyTimes();
+
+ Capture<KinesisIndexTask> captured = Capture.newInstance();
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KinesisDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountBeforeScale);
+ autoscaler.start();
+
+ supervisor.runInternal();
+ verifyAll();
+ Thread.sleep(1 * 1000);
+ int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(2, taskCountAfterScale);
+ }
+
+ @Test
+ public void testNoInitialStateWithAutoScaleIn() throws Exception
+ {
+ HashMap<String, Object> autoScalerConfigMap = new HashMap<>();
+ autoScalerConfigMap.put("enableTaskAutoScaler", true);
+ autoScalerConfigMap.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfigMap.put("lagCollectionRangeMillis", 500);
+ autoScalerConfigMap.put("scaleOutThreshold", 1000000);
+ autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.8);
+ autoScalerConfigMap.put("scaleInThreshold", 0);
+ autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.0);
+ autoScalerConfigMap.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfigMap.put("scaleActionPeriodMillis", 100);
+ autoScalerConfigMap.put("taskCountMax", 2);
+ autoScalerConfigMap.put("taskCountMin", 1);
+ autoScalerConfigMap.put("scaleInStep", 1);
+ autoScalerConfigMap.put("scaleOutStep", 2);
+ autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+ AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
+ supervisor = getTestableSupervisor(
+ 1,
+ 2,
+ true,
+ "PT1H",
+ null,
+ null,
+ false,
+ null,
+ null,
+ autoScalerConfig
+ );
+
+ KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec();
+ SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor);
+
+ supervisorRecordSupplier.assign(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
+ .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
+ .anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getAssignment())
+ .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
+ .anyTimes();
+ supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+ supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+ EasyMock.expectLastCall().anyTimes();
+
+ Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ EasyMock
+ .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+ .andReturn(new KinesisDataSourceMetadata(null))
+ .anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
+ replayAll();
+
+ int taskCountInit = supervisor.getIoConfig().getTaskCount();
+ // when enable autoScaler the init taskCount will be equal to taskCountMin
+ Assert.assertEquals(1, taskCountInit);
+ supervisor.getIoConfig().setTaskCount(2);
+
+ supervisor.start();
+ int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(2, taskCountBeforeScale);
+ autoscaler.start();
+
+ supervisor.runInternal();
+ verifyAll();
+ Thread.sleep(1 * 1000);
+ int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountAfterScale);
+ }
+
+ @Test
public void testRecordSupplier()
{
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@@ -347,69 +501,64 @@
}
@Test
- public void testKinesisIOConfig()
+ public void testKinesisIOConfigInitAndAutoscalerConfigCreation()
{
- Exception e = null;
- try {
- KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
- STREAM,
- INPUT_FORMAT,
- "awsEndpoint",
- null,
- 1,
- 1,
- new Period("PT30M"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null,
- null,
- 100,
- 1000,
- null,
- null,
- null,
- false
- );
- AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig();
- Assert.assertNull(autoScalerConfig);
- }
- catch (Exception ex) {
- e = ex;
- }
- Assert.assertNull(e);
+ // create KinesisSupervisorIOConfig with autoScalerConfig null
+ KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig(
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ null,
+ false
+ );
- try {
- KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
- STREAM,
- INPUT_FORMAT,
- "awsEndpoint",
- null,
- 1,
- 1,
- new Period("PT30M"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null,
- null,
- 100,
- 1000,
- null,
- null,
- OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
- false
- );
- }
- catch (Exception ex) {
- e = ex;
- }
- Assert.assertNotNull(e);
- Assert.assertTrue(e instanceof UnsupportedOperationException);
+ AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig();
+ Assert.assertNull(autoscalerConfigNull);
+
+ // create KinesisSupervisorIOConfig with autoScalerConfig Empty
+ KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig(
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
+ false
+ );
+
+ AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig();
+ Assert.assertNotNull(autoscalerConfig);
+ Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig);
+ Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler());
+ Assert.assertTrue(autoscalerConfig.toString().contains("autoScalerConfig"));
}
@Test
@@ -4895,6 +5044,7 @@
earlyMessageRejectionPeriod,
false,
null,
+ null,
null
);
}
@@ -4908,7 +5058,8 @@
Period earlyMessageRejectionPeriod,
boolean suspended,
Integer recordsPerFetch,
- Integer fetchDelayMillis
+ Integer fetchDelayMillis,
+ AutoScalerConfig autoScalerConfig
)
{
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@@ -4930,7 +5081,7 @@
fetchDelayMillis,
null,
null,
- null,
+ autoScalerConfig,
false
);
@@ -5303,6 +5454,8 @@
private class TestableKinesisSupervisor extends KinesisSupervisor
{
+ private KinesisSupervisorSpec spec;
+
TestableKinesisSupervisor(
TaskStorage taskStorage,
TaskMaster taskMaster,
@@ -5323,6 +5476,12 @@
rowIngestionMetersFactory,
null
);
+ this.spec = spec;
+ }
+
+ protected KinesisSupervisorSpec getKinesisSupervisorSpec()
+ {
+ return spec;
}
@Override
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 66d1139..2074900 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -68,7 +68,6 @@
/**
* Computes maxLag, totalLag and avgLag
- * Only supports Kafka ingestion so far.
*/
LagStats computeLagStats();