eliminate `now` error at a fixed rate schedule executor (#7145)
diff --git a/CHANGES.md b/CHANGES.md
index 598f0d4..dae2bc0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -42,6 +42,7 @@
* Upgrade commons-lang3 to avoid potential NPE in some JDK versions.
* OAL supports generating metrics from events.
* Support endpoint name grouping by OpenAPI definitions.
+* Fix CounterWindow increase computing issue.
#### UI
* Fix the date component for log conditions.
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java
index 3e716a1..ded0dfd 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java
@@ -51,4 +51,10 @@
double nv = transform.apply(i._2, i._1);
return newValue(ignored -> nv);
}
+
+ Sample increase(Function2<Double, Long, Double> transform) {
+ Tuple2<Long, Double> i = CounterWindow.INSTANCE.pop(name, labels, value, timestamp);
+ double nv = transform.apply(i._2, i._1);
+ return newValue(ignored -> nv);
+ }
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index b1f8870..f38887b 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -288,7 +288,20 @@
}
public SampleFamily irate() {
- return rate("PT1S");
+ if (this == EMPTY) {
+ return EMPTY;
+ }
+ return SampleFamily.build(
+ this.context,
+ Arrays.stream(samples)
+ .map(sample -> sample.increase(
+ (lowerBoundValue, lowerBoundTime) -> {
+ final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000;
+ return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff;
+ }
+ ))
+ .toArray(Sample[]::new)
+ );
}
@SuppressWarnings(value = "unchecked")
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
index 46ac2a5..ecdc329 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
@@ -22,8 +22,8 @@
import com.google.common.collect.Maps;
import io.vavr.Tuple;
import io.vavr.Tuple2;
-import java.util.LinkedList;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Queue;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
@@ -42,20 +42,47 @@
public static final CounterWindow INSTANCE = new CounterWindow();
+ private final Map<ID, Tuple2<Long, Double>> lastElementMap = Maps.newHashMap();
private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
ID id = new ID(name, labels);
if (!windows.containsKey(id)) {
- windows.put(id, new LinkedList<>());
+ windows.put(id, new PriorityQueue<>());
}
+
Queue<Tuple2<Long, Double>> window = windows.get(id);
window.offer(Tuple.of(now, value));
- Tuple2<Long, Double> ps = window.element();
- if ((now - ps._1) >= windowSize) {
- window.remove();
+ long waterLevel = now - windowSize;
+ Tuple2<Long, Double> peek = window.peek();
+ if (peek._1 > waterLevel) {
+ return peek;
}
- return ps;
+
+ Tuple2<Long, Double> result = peek;
+ while (peek._1 < waterLevel) {
+ result = window.poll();
+ peek = window.element();
+ }
+
+ // Choose the closed slot to the expected timestamp
+ if (waterLevel - result._1 <= peek._1 - waterLevel) {
+ return result;
+ }
+
+ return peek;
+ }
+
+ public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> labels, Double value, long now) {
+ ID id = new ID(name, labels);
+
+ Tuple2<Long, Double> element = Tuple.of(now, value);
+ Tuple2<Long, Double> result = lastElementMap.get(id);
+ lastElementMap.put(id, element);
+ if (result == null) {
+ return element;
+ }
+ return result;
}
public void reset() {
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindowTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindowTest.java
new file mode 100644
index 0000000..36e7749
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindowTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.counter;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.vavr.Tuple2;
+import java.time.Duration;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.time.Instant.parse;
+
+public class CounterWindowTest {
+
+ public static List<Tuple2<Long, Double>> parameters() {
+ return Lists.newArrayList(
+ new Tuple2<>(parse("2020-09-11T11:11:01.03Z").toEpochMilli(), 10d),
+ new Tuple2<>(parse("2020-09-11T11:11:15.99Z").toEpochMilli(), 11d),
+ new Tuple2<>(parse("2020-09-11T11:11:31.00Z").toEpochMilli(), 12d),
+ new Tuple2<>(parse("2020-09-11T11:11:46.09Z").toEpochMilli(), 13d),
+ new Tuple2<>(parse("2020-09-11T11:12:00.97Z").toEpochMilli(), 14d),
+ new Tuple2<>(parse("2020-09-11T11:11:00.97Z").toEpochMilli(), 15d),
+ new Tuple2<>(parse("2020-09-11T11:12:16.60Z").toEpochMilli(), 16d),
+ new Tuple2<>(parse("2020-09-11T11:12:31.66Z").toEpochMilli(), 17d)
+ );
+ }
+
+ @Test
+ public void testPT15S() {
+ double[] actuals = parameters().stream().mapToDouble(e -> {
+ Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+ "test", ImmutableMap.<String, String>builder().build(), e._2,
+ Duration.parse("PT15S").getSeconds() * 1000, e._1
+ );
+ return e._2 - increase._2;
+ }).toArray();
+
+ Assert.assertArrayEquals(new double[] {0, 1d, 1d, 1d, 1d, 0d, 2d, 1d}, actuals, 0.d);
+ }
+
+ @Test
+ public void testPT35S() {
+ double[] actuals = parameters().stream().mapToDouble(e -> {
+ Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+ "test", ImmutableMap.<String, String>builder().build(), e._2,
+ Duration.parse("PT35S").getSeconds() * 1000, e._1
+ );
+ return e._2 - increase._2;
+ }).toArray();
+
+ Assert.assertArrayEquals(new double[] {0, 1d, 2d, 2d, 2d, 0d, 3d, 3d}, actuals, 0.d);
+ }
+
+ @Test
+ public void testPT1M() {
+ double[] actuals = parameters().stream().mapToDouble(e -> {
+ Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+ "test", ImmutableMap.<String, String>builder().build(), e._2,
+ Duration.parse("PT1M").getSeconds() * 1000, e._1
+ );
+ return e._2 - increase._2;
+ }).toArray();
+
+ Assert.assertArrayEquals(new double[] {0, 1d, 2d, 3d, 4d, 0d, 5d, 5d}, actuals, 0.d);
+ }
+
+ @Test
+ public void testPT2M() {
+ double[] actuals = parameters().stream().mapToDouble(e -> {
+ Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+ "test", ImmutableMap.<String, String>builder().build(), e._2,
+ Duration.parse("PT2M").getSeconds() * 1000, e._1
+ );
+ return e._2 - increase._2;
+ }).toArray();
+
+ Assert.assertArrayEquals(new double[] {0, 1d, 2d, 3d, 4d, 0d, 1d, 2d}, actuals, 0.d);
+ }
+}