Add method Supervisor.computeLagForAutoScaler (#16314)

Tries to address the comments made on #16284 after merged.

Changes:
- Remove method `Supervisor.getLagMetric()`
- Add method `Supervisor.computeLagForAutoScaler()`
- Remove classes `LagMetric` and `LagMetricTest`
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index e1a7656..365a913 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -39,7 +39,6 @@
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -429,9 +428,10 @@
   }
 
   @Override
-  public LagMetric getLagMetricForAutoScaler()
+  public long computeLagForAutoScaler()
   {
-    return LagMetric.MAX;
+    LagStats lagStats = computeLagStats();
+    return lagStats == null ? 0L : lagStats.getMaxLag();
   }
 
   private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 7813725..f8618b0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -21,7 +21,6 @@
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.common.StringUtils;
@@ -155,13 +154,8 @@
       LOCK.lock();
       try {
         if (!spec.isSuspended()) {
-          LagStats lagStats = supervisor.computeLagStats();
-          if (lagStats == null) {
-            lagMetricsQueue.offer(0L);
-          } else {
-            long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
-            lagMetricsQueue.offer(lag > 0 ? lag : 0L);
-          }
+          long lag = supervisor.computeLagForAutoScaler();
+          lagMetricsQueue.offer(lag > 0 ? lag : 0L);
           log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
         } else {
           log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 8befa2a..9b9511c 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -23,7 +23,6 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 
@@ -95,11 +94,12 @@
   LagStats computeLagStats();
 
   /**
-   * Used by AutoScaler to either scale by max/total/avg.
+   * Used by AutoScaler to make scaling decisions.
    */
-  default LagMetric getLagMetricForAutoScaler()
+  default long computeLagForAutoScaler()
   {
-    return LagMetric.TOTAL;
+    LagStats lagStats = computeLagStats();
+    return lagStats == null ? 0L : lagStats.getTotalLag();
   }
 
   int getActiveTaskGroupsCount();
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
deleted file mode 100644
index d3f00b5..0000000
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.overlord.supervisor.autoscaler;
-
-public enum LagMetric
-{
-  TOTAL,
-  MAX,
-  AVERAGE;
-}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
index c7a6dfc..7b6e5fd 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -46,17 +46,4 @@
   {
     return avgLag;
   }
-
-  public long get(LagMetric metric)
-  {
-    switch (metric) {
-      case AVERAGE:
-        return avgLag;
-      case TOTAL:
-        return totalLag;
-      case MAX:
-        return maxLag;
-    }
-    throw new IllegalStateException("Unknown Metric");
-  }
 }
diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
similarity index 69%
rename from server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
rename to server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
index b518825..7981107 100644
--- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
@@ -19,24 +19,22 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
-public class LagStatsTest
+public class SupervisorTest
 {
-
   @Test
-  public void lagStatsByMetric()
+  public void testAutoScalerLagComputation()
   {
-    int max = 1;
-    int avg = 2;
-    int total = 3;
-    LagStats lag = new LagStats(max, total, avg);
+    Supervisor supervisor = Mockito.spy(Supervisor.class);
 
-    Assert.assertEquals(max, lag.get(LagMetric.MAX));
-    Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
-    Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
+    Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3));
+    Assert.assertEquals(2, supervisor.computeLagForAutoScaler());
+
+    Mockito.when(supervisor.computeLagStats()).thenReturn(null);
+    Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
   }
 }