KAFKA-15541: Use LongAdder instead of AtomicInteger (#16076)

`LongAdder` performs better than `AtomicInteger` when under contention
from many threads. Since it's possible that many Interactive Query
threads could create a large number of `KeyValueIterator`s, we don't
want contention on a metric to be a performance bottleneck.

The trade-off is memory, as `LongAdder` uses more memory to space out
independent counters across different cache lines. In practice, I don't
expect this to cause too many problems, as we're only constructing 1
per-store.

Reviewers: Matthias J. Sax <matthias@confluent.io>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 85a7fe9..f4c2b3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -52,7 +52,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -95,7 +95,7 @@
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
-    protected AtomicInteger numOpenIterators = new AtomicInteger(0);
+    protected LongAdder numOpenIterators = new LongAdder();
 
     @SuppressWarnings("rawtypes")
     private final Map<Class, QueryHandler> queryHandlers =
@@ -168,7 +168,7 @@
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
         iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
-                (config, now) -> numOpenIterators.get());
+                (config, now) -> numOpenIterators.sum());
     }
 
     protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
@@ -467,7 +467,7 @@
             this.iter = iter;
             this.sensor = sensor;
             this.startNs = time.nanoseconds();
-            numOpenIterators.incrementAndGet();
+            numOpenIterators.increment();
         }
 
         @Override
@@ -491,7 +491,7 @@
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
-                numOpenIterators.decrementAndGet();
+                numOpenIterators.decrement();
             }
         }
 
@@ -515,7 +515,7 @@
             this.sensor = sensor;
             this.valueDeserializer = valueDeserializer;
             this.startNs = time.nanoseconds();
-            numOpenIterators.incrementAndGet();
+            numOpenIterators.increment();
         }
 
         @Override
@@ -539,7 +539,7 @@
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
-                numOpenIterators.decrementAndGet();
+                numOpenIterators.decrement();
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
index 4663ef5..92c315c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 import org.apache.kafka.common.metrics.Sensor;
@@ -28,7 +28,7 @@
 
     private final VersionedRecordIterator<byte[]> iterator;
     private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
-    private final AtomicInteger numOpenIterators;
+    private final LongAdder numOpenIterators;
     private final Sensor sensor;
     private final Time time;
     private final long startNs;
@@ -37,14 +37,14 @@
                                                  final Sensor sensor,
                                                  final Time time,
                                                  final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
-                                                 final AtomicInteger numOpenIterators) {
+                                                 final LongAdder numOpenIterators) {
         this.iterator = iterator;
         this.deserializeValue = deserializeValue;
         this.numOpenIterators = numOpenIterators;
         this.sensor = sensor;
         this.time = time;
         this.startNs = time.nanoseconds();
-        numOpenIterators.incrementAndGet();
+        numOpenIterators.increment();
     }
 
 
@@ -54,7 +54,7 @@
             iterator.close();
         } finally {
             sensor.record(time.nanoseconds() - startNs);
-            numOpenIterators.decrementAndGet();
+            numOpenIterators.decrement();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 0cb9445..5208a0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,7 +47,7 @@
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -72,7 +72,7 @@
     private InternalProcessorContext<?, ?> context;
     private TaskId taskId;
 
-    private AtomicInteger numOpenIterators = new AtomicInteger(0);
+    private LongAdder numOpenIterators = new LongAdder();
 
     @SuppressWarnings("rawtypes")
     private final Map<Class, QueryHandler> queryHandlers =
@@ -137,7 +137,7 @@
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
         iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
-                (config, now) -> numOpenIterators.get());
+                (config, now) -> numOpenIterators.sum());
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 3c39998..757a1bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -325,7 +325,7 @@
             this.valueAndTimestampDeserializer = valueAndTimestampDeserializer;
             this.startNs = time.nanoseconds();
             this.returnPlainValue = returnPlainValue;
-            numOpenIterators.incrementAndGet();
+            numOpenIterators.increment();
         }
 
         @Override
@@ -353,7 +353,7 @@
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
-                numOpenIterators.decrementAndGet();
+                numOpenIterators.decrement();
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 3f7289e..ce7f022 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -49,7 +49,7 @@
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -76,7 +76,7 @@
     private InternalProcessorContext<?, ?> context;
     private TaskId taskId;
 
-    private AtomicInteger numOpenIterators = new AtomicInteger(0);
+    private LongAdder numOpenIterators = new LongAdder();
 
     @SuppressWarnings("rawtypes")
     private final Map<Class, QueryHandler> queryHandlers =
@@ -156,7 +156,7 @@
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
         iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
-                (config, now) -> numOpenIterators.get());
+                (config, now) -> numOpenIterators.sum());
     }
 
     @Deprecated
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
index 1294cfc..62fb7b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
@@ -22,7 +22,7 @@
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
@@ -34,7 +34,7 @@
     private final Function<byte[], V> valueFrom;
     private final long startNs;
     private final Time time;
-    private final AtomicInteger numOpenIterators;
+    private final LongAdder numOpenIterators;
 
     MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
                                final Sensor operationSensor,
@@ -42,7 +42,7 @@
                                final StreamsMetrics metrics,
                                final Function<byte[], V> valueFrom,
                                final Time time,
-                               final AtomicInteger numOpenIterators) {
+                               final LongAdder numOpenIterators) {
         this.iter = iter;
         this.operationSensor = operationSensor;
         this.iteratorSensor = iteratorSensor;
@@ -51,7 +51,7 @@
         this.startNs = time.nanoseconds();
         this.time = time;
         this.numOpenIterators = numOpenIterators;
-        numOpenIterators.incrementAndGet();
+        numOpenIterators.increment();
     }
 
     @Override
@@ -73,7 +73,7 @@
             final long duration = time.nanoseconds() - startNs;
             operationSensor.record(duration);
             iteratorSensor.record(duration);
-            numOpenIterators.decrementAndGet();
+            numOpenIterators.decrement();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
index e69b27c..e663a2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
@@ -24,7 +24,7 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
@@ -37,7 +37,7 @@
     private final Function<byte[], V> deserializeValue;
     private final long startNs;
     private final Time time;
-    private final AtomicInteger numOpenIterators;
+    private final LongAdder numOpenIterators;
 
     MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
                                     final Sensor operationSensor,
@@ -46,7 +46,7 @@
                                     final Function<byte[], K> deserializeKey,
                                     final Function<byte[], V> deserializeValue,
                                     final Time time,
-                                    final AtomicInteger numOpenIterators) {
+                                    final LongAdder numOpenIterators) {
         this.iter = iter;
         this.operationSensor = operationSensor;
         this.iteratorSensor = iteratorSensor;
@@ -56,7 +56,7 @@
         this.startNs = time.nanoseconds();
         this.time = time;
         this.numOpenIterators = numOpenIterators;
-        numOpenIterators.incrementAndGet();
+        numOpenIterators.increment();
     }
 
     @Override
@@ -83,7 +83,7 @@
             final long duration = time.nanoseconds() - startNs;
             operationSensor.record(duration);
             iteratorSensor.record(duration);
-            numOpenIterators.decrementAndGet();
+            numOpenIterators.decrement();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index aa9d9c3..fd2cae1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -438,7 +438,7 @@
                                                 final String storeType,
                                                 final String storeName,
                                                 final StreamsMetricsImpl streamsMetrics,
-                                                final Gauge<Integer> numOpenIteratorsGauge) {
+                                                final Gauge<Long> numOpenIteratorsGauge) {
         streamsMetrics.addStoreLevelMutableMetric(
                 taskId,
                 storeType,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index d2227bc..7e78084 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -451,13 +451,13 @@
         final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
         assertThat(openIteratorsMetric, not(nullValue()));
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
 
         try (final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer)) {
-            assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+            assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
         }
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 3c5a923..994a410 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -615,13 +615,13 @@
         final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
         assertThat(openIteratorsMetric, not(nullValue()));
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
 
         try (final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY)) {
-            assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+            assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
         }
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 7e37440..9fbd6ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -449,13 +449,13 @@
         final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
         assertThat(openIteratorsMetric, not(nullValue()));
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
 
         try (final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.all()) {
-            assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+            assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
         }
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 10b8e2e..0915942 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -380,15 +380,15 @@
         final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators");
         assertThat(openIteratorsMetric, not(nullValue()));
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
 
         final QueryResult<VersionedRecordIterator<String>> result = store.query(query, bound, config);
 
         try (final VersionedRecordIterator<String> iterator = result.getResult()) {
-            assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+            assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
         }
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index aa63703..c57bd19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -456,13 +456,13 @@
         final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
         assertThat(openIteratorsMetric, not(nullValue()));
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
 
         try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
-            assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+            assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
         }
 
-        assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
     }
 
     @Test