Adding more debug logs to increase visibility into StreamSupervisor notices queue size and processing time. (#11415)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f593f7c..d5e7257 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -93,6 +93,8 @@
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -825,10 +827,10 @@
synchronized (stopLock) {
if (stopGracefully) {
log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish");
- notices.add(new GracefulShutdownNotice());
+ addNotice(new GracefulShutdownNotice());
} else {
log.info("Posting ShutdownNotice");
- notices.add(new ShutdownNotice());
+ addNotice(new ShutdownNotice());
}
long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis();
@@ -865,7 +867,7 @@
public void reset(DataSourceMetadata dataSourceMetadata)
{
log.info("Posting ResetNotice");
- notices.add(new ResetNotice(dataSourceMetadata));
+ addNotice(new ResetNotice(dataSourceMetadata));
}
public ReentrantLock getRecordSupplierLock()
@@ -902,7 +904,11 @@
}
try {
+ Instant handleNoticeStartTime = Instant.now();
notice.handle();
+ Instant handleNoticeEndTime = Instant.now();
+ Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
+ log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d]", notice.getClass().getName(), timeElapsed.toMillis(), getNoticesQueueSize());
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
@@ -956,7 +962,7 @@
private Runnable buildRunTask()
{
- return () -> notices.add(new RunNotice());
+ return () -> addNotice(new RunNotice());
}
@Override
@@ -1274,7 +1280,7 @@
@Override
public void statusChanged(String taskId, TaskStatus status)
{
- notices.add(new RunNotice());
+ addNotice(new RunNotice());
}
}, Execs.directExecutor()
);
@@ -3109,6 +3115,7 @@
private void addNotice(Notice notice)
{
+ log.debug("Adding notice [%s] to notices queue", notice.getClass().getName());
notices.add(notice);
}