Adding more debug logs to increase visibility into StreamSupervisor notices queue size and processing time. (#11415)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f593f7c..d5e7257 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -93,6 +93,8 @@
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -825,10 +827,10 @@
           synchronized (stopLock) {
             if (stopGracefully) {
               log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish");
-              notices.add(new GracefulShutdownNotice());
+              addNotice(new GracefulShutdownNotice());
             } else {
               log.info("Posting ShutdownNotice");
-              notices.add(new ShutdownNotice());
+              addNotice(new ShutdownNotice());
             }
 
             long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis();
@@ -865,7 +867,7 @@
   public void reset(DataSourceMetadata dataSourceMetadata)
   {
     log.info("Posting ResetNotice");
-    notices.add(new ResetNotice(dataSourceMetadata));
+    addNotice(new ResetNotice(dataSourceMetadata));
   }
 
   public ReentrantLock getRecordSupplierLock()
@@ -902,7 +904,11 @@
                   }
 
                   try {
+                    Instant handleNoticeStartTime = Instant.now();
                     notice.handle();
+                    Instant handleNoticeEndTime = Instant.now();
+                    Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
+                    log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d]", notice.getClass().getName(), timeElapsed.toMillis(), getNoticesQueueSize());
                   }
                   catch (Throwable e) {
                     stateManager.recordThrowableEvent(e);
@@ -956,7 +962,7 @@
 
   private Runnable buildRunTask()
   {
-    return () -> notices.add(new RunNotice());
+    return () -> addNotice(new RunNotice());
   }
 
   @Override
@@ -1274,7 +1280,7 @@
             @Override
             public void statusChanged(String taskId, TaskStatus status)
             {
-              notices.add(new RunNotice());
+              addNotice(new RunNotice());
             }
           }, Execs.directExecutor()
       );
@@ -3109,6 +3115,7 @@
 
   private void addNotice(Notice notice)
   {
+    log.debug("Adding notice [%s] to notices queue", notice.getClass().getName());
     notices.add(notice);
   }