eliminate `now` error at a fixed rate schedule executor (#7145)

diff --git a/CHANGES.md b/CHANGES.md
index 598f0d4..dae2bc0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -42,6 +42,7 @@
 * Upgrade commons-lang3 to avoid potential NPE in some JDK versions.
 * OAL supports generating metrics from events.
 * Support endpoint name grouping by OpenAPI definitions.
+* Fix CounterWindow increase computing issue.
 
 #### UI
 * Fix the date component for log conditions.
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java
index 3e716a1..ded0dfd 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/Sample.java
@@ -51,4 +51,10 @@
         double nv = transform.apply(i._2, i._1);
         return newValue(ignored -> nv);
     }
+
+    Sample increase(Function2<Double, Long, Double> transform) {
+        Tuple2<Long, Double> i = CounterWindow.INSTANCE.pop(name, labels, value, timestamp);
+        double nv = transform.apply(i._2, i._1);
+        return newValue(ignored -> nv);
+    }
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index b1f8870..f38887b 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -288,7 +288,20 @@
     }
 
     public SampleFamily irate() {
-        return rate("PT1S");
+        if (this == EMPTY) {
+            return EMPTY;
+        }
+        return SampleFamily.build(
+            this.context,
+            Arrays.stream(samples)
+                  .map(sample -> sample.increase(
+                      (lowerBoundValue, lowerBoundTime) -> {
+                          final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000;
+                          return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff;
+                      }
+                  ))
+                  .toArray(Sample[]::new)
+        );
     }
 
     @SuppressWarnings(value = "unchecked")
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
index 46ac2a5..ecdc329 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
@@ -22,8 +22,8 @@
 import com.google.common.collect.Maps;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
-import java.util.LinkedList;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Queue;
 import lombok.AccessLevel;
 import lombok.EqualsAndHashCode;
@@ -42,20 +42,47 @@
 
     public static final CounterWindow INSTANCE = new CounterWindow();
 
+    private final Map<ID, Tuple2<Long, Double>> lastElementMap = Maps.newHashMap();
     private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
 
     public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
         ID id = new ID(name, labels);
         if (!windows.containsKey(id)) {
-            windows.put(id, new LinkedList<>());
+            windows.put(id, new PriorityQueue<>());
         }
+
         Queue<Tuple2<Long, Double>> window = windows.get(id);
         window.offer(Tuple.of(now, value));
-        Tuple2<Long, Double> ps = window.element();
-        if ((now - ps._1) >= windowSize) {
-            window.remove();
+        long waterLevel = now - windowSize;
+        Tuple2<Long, Double> peek = window.peek();
+        if (peek._1 > waterLevel) {
+            return peek;
         }
-        return ps;
+
+        Tuple2<Long, Double> result = peek;
+        while (peek._1 < waterLevel) {
+            result = window.poll();
+            peek = window.element();
+        }
+
+        // Choose the closed slot to the expected timestamp
+        if (waterLevel - result._1 <= peek._1 - waterLevel) {
+            return result;
+        }
+
+        return peek;
+    }
+
+    public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> labels, Double value, long now) {
+        ID id = new ID(name, labels);
+
+        Tuple2<Long, Double> element = Tuple.of(now, value);
+        Tuple2<Long, Double> result = lastElementMap.get(id);
+        lastElementMap.put(id, element);
+        if (result == null) {
+            return element;
+        }
+        return result;
     }
 
     public void reset() {
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindowTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindowTest.java
new file mode 100644
index 0000000..36e7749
--- /dev/null
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindowTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.counter;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.vavr.Tuple2;
+import java.time.Duration;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.time.Instant.parse;
+
+public class CounterWindowTest {
+
+    public static List<Tuple2<Long, Double>> parameters() {
+        return Lists.newArrayList(
+            new Tuple2<>(parse("2020-09-11T11:11:01.03Z").toEpochMilli(), 10d),
+            new Tuple2<>(parse("2020-09-11T11:11:15.99Z").toEpochMilli(), 11d),
+            new Tuple2<>(parse("2020-09-11T11:11:31.00Z").toEpochMilli(), 12d),
+            new Tuple2<>(parse("2020-09-11T11:11:46.09Z").toEpochMilli(), 13d),
+            new Tuple2<>(parse("2020-09-11T11:12:00.97Z").toEpochMilli(), 14d),
+            new Tuple2<>(parse("2020-09-11T11:11:00.97Z").toEpochMilli(), 15d),
+            new Tuple2<>(parse("2020-09-11T11:12:16.60Z").toEpochMilli(), 16d),
+            new Tuple2<>(parse("2020-09-11T11:12:31.66Z").toEpochMilli(), 17d)
+        );
+    }
+
+    @Test
+    public void testPT15S() {
+        double[] actuals = parameters().stream().mapToDouble(e -> {
+            Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+                "test", ImmutableMap.<String, String>builder().build(), e._2,
+                Duration.parse("PT15S").getSeconds() * 1000, e._1
+            );
+            return e._2 - increase._2;
+        }).toArray();
+
+        Assert.assertArrayEquals(new double[] {0, 1d, 1d, 1d, 1d, 0d, 2d, 1d}, actuals, 0.d);
+    }
+
+    @Test
+    public void testPT35S() {
+        double[] actuals = parameters().stream().mapToDouble(e -> {
+            Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+                "test", ImmutableMap.<String, String>builder().build(), e._2,
+                Duration.parse("PT35S").getSeconds() * 1000, e._1
+            );
+            return e._2 - increase._2;
+        }).toArray();
+
+        Assert.assertArrayEquals(new double[] {0, 1d, 2d, 2d, 2d, 0d, 3d, 3d}, actuals, 0.d);
+    }
+
+    @Test
+    public void testPT1M() {
+        double[] actuals = parameters().stream().mapToDouble(e -> {
+            Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+                "test", ImmutableMap.<String, String>builder().build(), e._2,
+                Duration.parse("PT1M").getSeconds() * 1000, e._1
+            );
+            return e._2 - increase._2;
+        }).toArray();
+
+        Assert.assertArrayEquals(new double[] {0, 1d, 2d, 3d, 4d, 0d, 5d, 5d}, actuals, 0.d);
+    }
+
+    @Test
+    public void testPT2M() {
+        double[] actuals = parameters().stream().mapToDouble(e -> {
+            Tuple2<Long, Double> increase = CounterWindow.INSTANCE.increase(
+                "test", ImmutableMap.<String, String>builder().build(), e._2,
+                Duration.parse("PT2M").getSeconds() * 1000, e._1
+            );
+            return e._2 - increase._2;
+        }).toArray();
+
+        Assert.assertArrayEquals(new double[] {0, 1d, 2d, 3d, 4d, 0d, 1d, 2d}, actuals, 0.d);
+    }
+}