KAFKA-19959: Update session and window store with new default when no open iterators (#21120)

When applying the NPE fix for the `oldest-iterator-open-since-ms` metric
we only applied this to the `MeteredKeyValueStore`. This fix needs to be
applied to `MeteredSessionStore` and `MeteredWindowStore` as well.

Reviewers: Matthias Sax <mjsax@apache.org>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 7794a6e..3767336 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,6 +47,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -125,9 +126,13 @@
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
                 (config, now) -> {
+                try {
                     final Iterator<MeteredIterator> openIteratorsIterator = openIterators.iterator();
-                    return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
+                    return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : 0L;
+                } catch (final NoSuchElementException e) {
+                    return 0L;
                 }
+            }
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 1ba37da..de924fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -50,6 +50,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -144,10 +145,14 @@
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
-                (config, now) -> {
+            (config, now) -> {
+                try {
                     final Iterator<MeteredIterator> openIteratorsIterator = openIterators.iterator();
-                    return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
+                    return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : 0L;
+                } catch (final NoSuchElementException e) {
+                    return 0L;
                 }
+            }
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f92304f..0bfcfc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -530,6 +530,8 @@
         final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
         assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
+
         KeyValueIterator<String, String> second = null;
         final long secondTimestamp;
         try {
@@ -546,14 +548,14 @@
             }
 
             // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
-            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
+            assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
         } finally {
             if (second != null) {
                 second.close();
             }
         }
         // no open iterators left, timestamp should be reset to 0
-        assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
     private KafkaMetric metric(final MetricName metricName) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index f227806..e19211e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -715,7 +715,7 @@
         final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
         assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
 
         KeyValueIterator<Windowed<String>, String> second = null;
         final long secondTimestamp;
@@ -733,14 +733,14 @@
             }
 
             // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
-            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
+            assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
         } finally {
             if (second != null) {
                 second.close();
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
     private KafkaMetric metric(final String name) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 3cf17ff..e643f60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -499,7 +499,7 @@
         final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms");
         assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
 
         KeyValueIterator<Windowed<String>, String> second = null;
         final long secondTimestamp;
@@ -517,14 +517,14 @@
             }
 
             // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator
-            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
+            assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp));
         } finally {
             if (second != null) {
                 second.close();
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
     private KafkaMetric metric(final String name) {