[pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
### Motivation
Many time user sees lower throughput in pulsar-spout even though standalone consumer can consume such msgRate easily. It would be hard to debug user's topology without enough information so, adding two metrics which can impact spout throughput.
- number of message filed: spout sleeps when it sees failed message so, it's important to have visibility of that count
- number of times spout-thread not found the message in queue: spout topology internally sleeps if it doesn't see any emitted tuple in collector after triggering `nextTuple()` api.
This metrics gives more visibility about consumer throughput.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index bbfe5cd..5a5ea59 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -58,6 +58,8 @@
public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
+ public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed";
+ public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount";
public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
public static final String CONSUMER_RATE = "consumerRate";
public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
@@ -78,6 +80,8 @@
private Consumer<byte[]> consumer;
private volatile long messagesReceived = 0;
private volatile long messagesEmitted = 0;
+ private volatile long messagesFailed = 0;
+ private volatile long messageNotAvailableCount = 0;
private volatile long pendingAcks = 0;
private volatile long messageSizeReceived = 0;
@@ -157,7 +161,7 @@
pendingMessageRetries.putIfAbsent(id, messageRetries);
failedMessages.add(msg);
--pendingAcks;
-
+ messagesFailed++;
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
ack(msg);
@@ -203,6 +207,7 @@
} else {
// queue is empty and nothing to emit
done = true;
+ messageNotAvailableCount++;
}
}
} catch (PulsarClientException e) {
@@ -334,6 +339,8 @@
metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size());
metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+ metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+ metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
@@ -345,6 +352,8 @@
messagesReceived = 0;
messagesEmitted = 0;
messageSizeReceived = 0;
+ messagesFailed = 0;
+ messageNotAvailableCount = 0;
}
@SuppressWarnings("rawtypes")