add ingest/notices/queueSize metric to give visibility into supervisor notices queue size (#11417)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index d4ca7ad..0ed0ff4 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -181,6 +181,8 @@
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource, taskId, taskType.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
+|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator|dataSource.|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
+|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor|dataSource, noticeType.| < 1s. |
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d5e7257..ec2a526 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -274,6 +274,13 @@
*/
private interface Notice
{
+ /**
+ * Returns a descriptive label for this notice type. Used for metrics emission and logging.
+ *
+ * @return task type label
+ */
+ String getType();
+
void handle() throws ExecutionException, InterruptedException, TimeoutException;
}
@@ -312,6 +319,8 @@
private class RunNotice implements Notice
{
+ private static final String TYPE = "run_notice";
+
@Override
public void handle()
{
@@ -323,12 +332,19 @@
runInternal();
}
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
}
// change taskCount without resubmitting.
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
+ private static final String TYPE = "dynamic_allocation_tasks_notice";
DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
{
@@ -382,6 +398,12 @@
}
}
}
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
}
/**
@@ -458,6 +480,8 @@
private class ShutdownNotice implements Notice
{
+ private static final String TYPE = "shutdown_notice";
+
@Override
public void handle() throws InterruptedException, ExecutionException, TimeoutException
{
@@ -468,11 +492,18 @@
stopLock.notifyAll();
}
}
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
}
private class ResetNotice implements Notice
{
final DataSourceMetadata dataSourceMetadata;
+ private static final String TYPE = "reset_notice";
ResetNotice(DataSourceMetadata dataSourceMetadata)
{
@@ -484,12 +515,19 @@
{
resetInternal(dataSourceMetadata);
}
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
}
protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;
+ private static final String TYPE = "checkpoint_notice";
CheckpointNotice(
int taskGroupId,
@@ -560,6 +598,12 @@
return true;
}
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
}
// Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class
@@ -908,7 +952,9 @@
notice.handle();
Instant handleNoticeEndTime = Instant.now();
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
- log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d]", notice.getClass().getName(), timeElapsed.toMillis(), getNoticesQueueSize());
+ String noticeType = notice.getType();
+ log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d] for datasource [%s]", noticeType, timeElapsed.toMillis(), getNoticesQueueSize(), dataSource);
+ emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
@@ -3588,6 +3634,7 @@
/**
* default implementation, schedules periodic fetch of latest offsets and {@link #emitLag} reporting for Kafka and Kinesis
+ * and periodic reporting of {@Link #emitNoticesQueueSize} for various data sources.
*/
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
@@ -3610,6 +3657,12 @@
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
+ reportingExec.scheduleAtFixedRate(
+ this::emitNoticesQueueSize,
+ ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
+ spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+ TimeUnit.MILLISECONDS
+ );
}
/**
@@ -3658,6 +3711,39 @@
&& makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0;
}
+ protected void emitNoticeProcessTime(String noticeType, long timeInMillis)
+ {
+ try {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension("noticeType", noticeType)
+ .setDimension("dataSource", dataSource)
+ .build("ingest/notices/time", timeInMillis)
+ );
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to emit notices process time");
+ }
+ }
+
+ protected void emitNoticesQueueSize()
+ {
+ if (spec.isSuspended()) {
+ // don't emit metrics if supervisor is suspended
+ return;
+ }
+ try {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension("dataSource", dataSource)
+ .build("ingest/notices/queueSize", getNoticesQueueSize())
+ );
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to emit notices queue size");
+ }
+ }
+
protected void emitLag()
{
if (spec.isSuspended() || !stateManager.isSteadyState()) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 91dd10a..5f5d47f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -88,6 +88,8 @@
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -97,6 +99,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
@@ -625,7 +628,7 @@
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(2);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
@@ -643,19 +646,23 @@
latch.await();
- Assert.assertEquals(6, emitter.getEvents().size());
- Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
- Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric"));
- Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
- Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric"));
- Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
- Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(3).toMap().get("metric"));
- Assert.assertEquals(45000L, emitter.getEvents().get(3).toMap().get("value"));
- Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(4).toMap().get("metric"));
- Assert.assertEquals(20000L, emitter.getEvents().get(4).toMap().get("value"));
- Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(5).toMap().get("metric"));
- Assert.assertEquals(15000L, emitter.getEvents().get(5).toMap().get("value"));
+ List<Event> events = emitter.getEvents();
+ List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag",
+ "ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
+ events = filterMetrics(events, whitelist);
+ Assert.assertEquals(6, events.size());
+ Assert.assertEquals("ingest/test/lag", events.get(0).toMap().get("metric"));
+ Assert.assertEquals(850L, events.get(0).toMap().get("value"));
+ Assert.assertEquals("ingest/test/maxLag", events.get(1).toMap().get("metric"));
+ Assert.assertEquals(500L, events.get(1).toMap().get("value"));
+ Assert.assertEquals("ingest/test/avgLag", events.get(2).toMap().get("metric"));
+ Assert.assertEquals(283L, events.get(2).toMap().get("value"));
+ Assert.assertEquals("ingest/test/lag/time", events.get(3).toMap().get("metric"));
+ Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
+ Assert.assertEquals("ingest/test/maxLag/time", events.get(4).toMap().get("metric"));
+ Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
+ Assert.assertEquals("ingest/test/avgLag/time", events.get(5).toMap().get("metric"));
+ Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
verifyAll();
}
@@ -664,7 +671,7 @@
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(2);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
@@ -682,13 +689,16 @@
latch.await();
- Assert.assertEquals(3, emitter.getEvents().size());
- Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
- Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric"));
- Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
- Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric"));
- Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
+ List<Event> events = emitter.getEvents();
+ List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag");
+ events = filterMetrics(events, whitelist);
+ Assert.assertEquals(3, events.size());
+ Assert.assertEquals("ingest/test/lag", events.get(0).toMap().get("metric"));
+ Assert.assertEquals(850L, events.get(0).toMap().get("value"));
+ Assert.assertEquals("ingest/test/maxLag", events.get(1).toMap().get("metric"));
+ Assert.assertEquals(500L, events.get(1).toMap().get("value"));
+ Assert.assertEquals("ingest/test/avgLag", events.get(2).toMap().get("metric"));
+ Assert.assertEquals(283L, events.get(2).toMap().get("value"));
verifyAll();
}
@@ -697,7 +707,7 @@
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(2);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
null,
@@ -715,13 +725,78 @@
latch.await();
- Assert.assertEquals(3, emitter.getEvents().size());
- Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(45000L, emitter.getEvents().get(0).toMap().get("value"));
- Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(1).toMap().get("metric"));
- Assert.assertEquals(20000L, emitter.getEvents().get(1).toMap().get("value"));
- Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(2).toMap().get("metric"));
- Assert.assertEquals(15000L, emitter.getEvents().get(2).toMap().get("value"));
+ List<Event> events = emitter.getEvents();
+ List<String> whitelist = Arrays.asList("ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
+ events = filterMetrics(events, whitelist);
+ Assert.assertEquals(3, events.size());
+ Assert.assertEquals("ingest/test/lag/time", events.get(0).toMap().get("metric"));
+ Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
+ Assert.assertEquals("ingest/test/maxLag/time", events.get(1).toMap().get("metric"));
+ Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
+ Assert.assertEquals("ingest/test/avgLag/time", events.get(2).toMap().get("metric"));
+ Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
+ verifyAll();
+ }
+
+ @Test
+ public void testEmitNoticesQueueSize() throws Exception
+ {
+ expectEmitterSupervisor(false);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
+ latch,
+ null,
+ null
+ );
+
+
+ supervisor.start();
+
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+
+ latch.await();
+ List<Event> events = emitter.getEvents();
+ List<String> whitelist = Collections.singletonList("ingest/notices/queueSize");
+ events = filterMetrics(events, whitelist);
+ Assert.assertEquals(1, events.size());
+ Assert.assertEquals("ingest/notices/queueSize", events.get(0).toMap().get("metric"));
+ Assert.assertEquals(0, events.get(0).toMap().get("value"));
+ Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
+ verifyAll();
+ }
+
+ @Test
+ public void testEmitNoticesTime() throws Exception
+ {
+ expectEmitterSupervisor(false);
+ CountDownLatch latch = new CountDownLatch(2);
+ TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
+ latch,
+ null,
+ null
+ );
+ supervisor.start();
+ supervisor.emitNoticesTime();
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+ latch.await();
+ List<Event> events = emitter.getEvents();
+ List<String> whitelist = Collections.singletonList("ingest/notices/time");
+ events = filterMetrics(events, whitelist);
+ Assert.assertEquals(1, events.size());
+ Assert.assertEquals("ingest/notices/time", events.get(0).toMap().get("metric"));
+ Assert.assertEquals(500L, events.get(0).toMap().get("value"));
+ Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
+ Assert.assertEquals("dummyNoticeType", events.get(0).toMap().get("noticeType"));
verifyAll();
}
@@ -730,7 +805,7 @@
{
expectEmitterSupervisor(true);
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(2);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
@@ -748,10 +823,22 @@
latch.await();
- Assert.assertEquals(0, emitter.getEvents().size());
+ List<Event> events = emitter.getEvents();
+ List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag",
+ "ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
+ events = filterMetrics(events, whitelist);
+ Assert.assertEquals(0, events.size());
verifyAll();
}
+ private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
+ {
+ List<Event> result = events.stream()
+ .filter(e -> whitelist.contains(e.toMap().get("metric").toString()))
+ .collect(Collectors.toList());
+ return result;
+ }
+
private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException
{
spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -1250,6 +1337,19 @@
}
@Override
+ protected void emitNoticesQueueSize()
+ {
+ super.emitNoticesQueueSize();
+ latch.countDown();
+ }
+
+ public void emitNoticesTime()
+ {
+ super.emitNoticeProcessTime("dummyNoticeType", 500);
+ latch.countDown();
+ }
+
+ @Override
public LagStats computeLagStats()
{
return null;
@@ -1265,6 +1365,12 @@
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
+ reportingExec.scheduleAtFixedRate(
+ this::emitNoticesQueueSize,
+ ioConfig.getStartDelay().getMillis(),
+ spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+ TimeUnit.MILLISECONDS
+ );
}
}
diff --git a/website/.spelling b/website/.spelling
index d4b568c..0e0ab00 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1276,6 +1276,7 @@
netAddress
netHwaddr
netName
+noticeType
numComplexMetrics
numDimensions
numMetrics