[pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)

### Motivation

Many time user sees lower throughput in pulsar-spout even though standalone consumer can consume such msgRate easily. It would be hard to debug user's topology without enough information so, adding two metrics which can impact spout throughput.
- number of message filed: spout sleeps when it sees failed message so, it's important to have visibility of that count
- number of times spout-thread not found the message in queue: spout topology internally sleeps if it doesn't see any emitted tuple in collector after triggering `nextTuple()` api.

This metrics gives more visibility about consumer throughput.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index bbfe5cd..5a5ea59 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -58,6 +58,8 @@
     public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
     public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
     public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
+    public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed";
+    public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount";
     public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
     public static final String CONSUMER_RATE = "consumerRate";
     public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
@@ -78,6 +80,8 @@
     private Consumer<byte[]> consumer;
     private volatile long messagesReceived = 0;
     private volatile long messagesEmitted = 0;
+    private volatile long messagesFailed = 0;
+    private volatile long messageNotAvailableCount = 0;
     private volatile long pendingAcks = 0;
     private volatile long messageSizeReceived = 0;
 
@@ -157,7 +161,7 @@
                 pendingMessageRetries.putIfAbsent(id, messageRetries);
                 failedMessages.add(msg);
                 --pendingAcks;
-
+                messagesFailed++;
             } else {
                 LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
                 ack(msg);
@@ -203,6 +207,7 @@
                     } else {
                         // queue is empty and nothing to emit
                         done = true;
+                        messageNotAvailableCount++;
                     }
                 }
             } catch (PulsarClientException e) {
@@ -334,6 +339,8 @@
         metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size());
         metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
         metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+        metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+        metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
         metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
         metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
         metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
@@ -345,6 +352,8 @@
         messagesReceived = 0;
         messagesEmitted = 0;
         messageSizeReceived = 0;
+        messagesFailed = 0;
+        messageNotAvailableCount = 0;
     }
 
     @SuppressWarnings("rawtypes")