add flush interval test to runtime
diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
index 6c87693..8b68dd6 100644
--- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
+++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
@@ -68,9 +68,9 @@
     void recordFlushTime(long durationMs);
 
     /**
-     * Record the flush time.
+     * Record the flush interval time.
      *
-     * @param durationMs The flush time in milliseconds.
+     * @param durationMs The flush interval time in milliseconds.
      */
     void recordFlushIntervalTime(long durationMs);
 
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index a2f25b2..29d954f 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4314,7 +4314,7 @@
     }
 
     @Test
-    public void testRecordFlushTime() throws Exception {
+    public void testRecordFlushTimeAndFlushIntervalTime() throws Exception {
         MockTimer timer = new MockTimer();
 
         // Writer sleeps for 10ms before appending records.
@@ -4394,6 +4394,7 @@
         assertEquals(List.of(
             records(firstBatchTimestamp, records.subList(0, 3))
         ), writer.entries(TP));
+        verify(runtimeMetrics, times(1)).recordFlushIntervalTime(0);
         verify(runtimeMetrics, times(1)).recordFlushTime(10);
 
         // Advance past the linger time.
@@ -4412,6 +4413,10 @@
             records(secondBatchTimestamp, records.subList(0, 3)),
             records(secondBatchTimestamp, records.subList(3, 4))
         ), writer.entries(TP));
+
+        // If the previous batch was full, this batch was allocated prior to previous flush.
+        // flush interval time = previous flush time + append.linger.ms
+        verify(runtimeMetrics, times(1)).recordFlushIntervalTime(21);
         verify(runtimeMetrics, times(2)).recordFlushTime(10);
 
         // Commit and verify that writes are completed.