KAFKA-15541: Use LongAdder instead of AtomicInteger (#16076)
`LongAdder` performs better than `AtomicInteger` when under contention
from many threads. Since it's possible that many Interactive Query
threads could create a large number of `KeyValueIterator`s, we don't
want contention on a metric to be a performance bottleneck.
The trade-off is memory, as `LongAdder` uses more memory to space out
independent counters across different cache lines. In practice, I don't
expect this to cause too many problems, as we're only constructing 1
per-store.
Reviewers: Matthias J. Sax <matthias@confluent.io>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 85a7fe9..f4c2b3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -52,7 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -95,7 +95,7 @@
private StreamsMetricsImpl streamsMetrics;
private TaskId taskId;
- protected AtomicInteger numOpenIterators = new AtomicInteger(0);
+ protected LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
@@ -168,7 +168,7 @@
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
- (config, now) -> numOpenIterators.get());
+ (config, now) -> numOpenIterators.sum());
}
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
@@ -467,7 +467,7 @@
this.iter = iter;
this.sensor = sensor;
this.startNs = time.nanoseconds();
- numOpenIterators.incrementAndGet();
+ numOpenIterators.increment();
}
@Override
@@ -491,7 +491,7 @@
final long duration = time.nanoseconds() - startNs;
sensor.record(duration);
iteratorDurationSensor.record(duration);
- numOpenIterators.decrementAndGet();
+ numOpenIterators.decrement();
}
}
@@ -515,7 +515,7 @@
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
this.startNs = time.nanoseconds();
- numOpenIterators.incrementAndGet();
+ numOpenIterators.increment();
}
@Override
@@ -539,7 +539,7 @@
final long duration = time.nanoseconds() - startNs;
sensor.record(duration);
iteratorDurationSensor.record(duration);
- numOpenIterators.decrementAndGet();
+ numOpenIterators.decrement();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
index 4663ef5..92c315c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.apache.kafka.common.metrics.Sensor;
@@ -28,7 +28,7 @@
private final VersionedRecordIterator<byte[]> iterator;
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
- private final AtomicInteger numOpenIterators;
+ private final LongAdder numOpenIterators;
private final Sensor sensor;
private final Time time;
private final long startNs;
@@ -37,14 +37,14 @@
final Sensor sensor,
final Time time,
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
- final AtomicInteger numOpenIterators) {
+ final LongAdder numOpenIterators) {
this.iterator = iterator;
this.deserializeValue = deserializeValue;
this.numOpenIterators = numOpenIterators;
this.sensor = sensor;
this.time = time;
this.startNs = time.nanoseconds();
- numOpenIterators.incrementAndGet();
+ numOpenIterators.increment();
}
@@ -54,7 +54,7 @@
iterator.close();
} finally {
sensor.record(time.nanoseconds() - startNs);
- numOpenIterators.decrementAndGet();
+ numOpenIterators.decrement();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 0cb9445..5208a0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,7 +47,7 @@
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -72,7 +72,7 @@
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
- private AtomicInteger numOpenIterators = new AtomicInteger(0);
+ private LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
@@ -137,7 +137,7 @@
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
- (config, now) -> numOpenIterators.get());
+ (config, now) -> numOpenIterators.sum());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 3c39998..757a1bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -325,7 +325,7 @@
this.valueAndTimestampDeserializer = valueAndTimestampDeserializer;
this.startNs = time.nanoseconds();
this.returnPlainValue = returnPlainValue;
- numOpenIterators.incrementAndGet();
+ numOpenIterators.increment();
}
@Override
@@ -353,7 +353,7 @@
final long duration = time.nanoseconds() - startNs;
sensor.record(duration);
iteratorDurationSensor.record(duration);
- numOpenIterators.decrementAndGet();
+ numOpenIterators.decrement();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 3f7289e..ce7f022 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -49,7 +49,7 @@
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -76,7 +76,7 @@
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
- private AtomicInteger numOpenIterators = new AtomicInteger(0);
+ private LongAdder numOpenIterators = new LongAdder();
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
@@ -156,7 +156,7 @@
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
- (config, now) -> numOpenIterators.get());
+ (config, now) -> numOpenIterators.sum());
}
@Deprecated
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
index 1294cfc..62fb7b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
@@ -22,7 +22,7 @@
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
@@ -34,7 +34,7 @@
private final Function<byte[], V> valueFrom;
private final long startNs;
private final Time time;
- private final AtomicInteger numOpenIterators;
+ private final LongAdder numOpenIterators;
MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
final Sensor operationSensor,
@@ -42,7 +42,7 @@
final StreamsMetrics metrics,
final Function<byte[], V> valueFrom,
final Time time,
- final AtomicInteger numOpenIterators) {
+ final LongAdder numOpenIterators) {
this.iter = iter;
this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor;
@@ -51,7 +51,7 @@
this.startNs = time.nanoseconds();
this.time = time;
this.numOpenIterators = numOpenIterators;
- numOpenIterators.incrementAndGet();
+ numOpenIterators.increment();
}
@Override
@@ -73,7 +73,7 @@
final long duration = time.nanoseconds() - startNs;
operationSensor.record(duration);
iteratorSensor.record(duration);
- numOpenIterators.decrementAndGet();
+ numOpenIterators.decrement();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
index e69b27c..e663a2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
@@ -24,7 +24,7 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
@@ -37,7 +37,7 @@
private final Function<byte[], V> deserializeValue;
private final long startNs;
private final Time time;
- private final AtomicInteger numOpenIterators;
+ private final LongAdder numOpenIterators;
MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
final Sensor operationSensor,
@@ -46,7 +46,7 @@
final Function<byte[], K> deserializeKey,
final Function<byte[], V> deserializeValue,
final Time time,
- final AtomicInteger numOpenIterators) {
+ final LongAdder numOpenIterators) {
this.iter = iter;
this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor;
@@ -56,7 +56,7 @@
this.startNs = time.nanoseconds();
this.time = time;
this.numOpenIterators = numOpenIterators;
- numOpenIterators.incrementAndGet();
+ numOpenIterators.increment();
}
@Override
@@ -83,7 +83,7 @@
final long duration = time.nanoseconds() - startNs;
operationSensor.record(duration);
iteratorSensor.record(duration);
- numOpenIterators.decrementAndGet();
+ numOpenIterators.decrement();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index aa9d9c3..fd2cae1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -438,7 +438,7 @@
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics,
- final Gauge<Integer> numOpenIteratorsGauge) {
+ final Gauge<Long> numOpenIteratorsGauge) {
streamsMetrics.addStoreLevelMutableMetric(
taskId,
storeType,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index d2227bc..7e78084 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -451,13 +451,13 @@
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<String, String> iterator = metered.prefixScan(KEY, stringSerializer)) {
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 3c5a923..994a410 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -615,13 +615,13 @@
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFetch(KEY)) {
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 7e37440..9fbd6ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -449,13 +449,13 @@
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<String, ValueAndTimestamp<String>> iterator = metered.all()) {
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 10b8e2e..0915942 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -380,15 +380,15 @@
final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
final QueryResult<VersionedRecordIterator<String>> result = store.query(query, bound, config);
try (final VersionedRecordIterator<String> iterator = result.getResult()) {
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index aa63703..c57bd19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -456,13 +456,13 @@
final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
assertThat(openIteratorsMetric, not(nullValue()));
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
try (final KeyValueIterator<Windowed<String>, String> iterator = store.all()) {
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
}
- assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0));
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
}
@Test