Fix thread unsafe problem in server-alarm-plugin (#4230) (#4247)

* fix thread unsafe problem in server-alarm-plugin (#4230)

* remove redundant #moveTo

Co-authored-by: kezhenxu94 <kezhenxu94@163.com>
Co-authored-by: 吴晟 Wu Sheng <wu.sheng@foxmail.com>
diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java
index 8b6f86c..5fe7471 100644
--- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java
+++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java
@@ -18,19 +18,9 @@
 
 package org.apache.skywalking.oap.server.core.alarm.provider;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
 import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm;
-import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.IntValueHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.*;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.joda.time.LocalDateTime;
 import org.joda.time.Minutes;
@@ -39,6 +29,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * RunningRule represents each rule in running status. Based on the {@link AlarmRule} definition,
  *
@@ -86,7 +83,7 @@
      * Receive metrics result from persistence, after it is saved into storage. In alarm, only minute dimensionality
      * metrics are expected to process.
      *
-     * @param meta of input metrics
+     * @param meta    of input metrics
      * @param metrics includes the values.
      */
     public void in(MetaInAlarm meta, Metrics metrics) {
@@ -130,11 +127,8 @@
             Window window = windows.get(meta);
             if (window == null) {
                 window = new Window(period);
-                LocalDateTime timeBucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(metrics.getTimeBucket() + "");
-                window.moveTo(timeBucket);
                 windows.put(meta, window);
             }
-
             window.add(metrics);
         }
     }
@@ -227,18 +221,17 @@
         public void add(Metrics metrics) {
             long bucket = metrics.getTimeBucket();
 
-            LocalDateTime timebucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(bucket + "");
+            LocalDateTime timeBucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(bucket + "");
 
-            int minutes = Minutes.minutesBetween(timebucket, endTime).getMinutes();
-            if (minutes == -1) {
-                this.moveTo(timebucket);
-
-            }
-
-            lock.lock();
+            this.lock.lock();
             try {
+                if (this.endTime == null) {
+                    init();
+                    this.endTime = timeBucket;
+                }
+                int minutes = Minutes.minutesBetween(timeBucket, this.endTime).getMinutes();
                 if (minutes < 0) {
-                    moveTo(timebucket);
+                    this.moveTo(timeBucket);
                     minutes = 0;
                 }
 
@@ -248,9 +241,9 @@
                     return;
                 }
 
-                values.set(values.size() - minutes - 1, metrics);
+                this.values.set(values.size() - minutes - 1, metrics);
             } finally {
-                lock.unlock();
+                this.lock.unlock();
             }
         }
 
@@ -290,28 +283,28 @@
 
                 switch (valueType) {
                     case LONG:
-                        long lvalue = ((LongValueHolder)metrics).getValue();
+                        long lvalue = ((LongValueHolder) metrics).getValue();
                         long lexpected = RunningRule.this.threshold.getLongThreshold();
                         if (op.test(lexpected, lvalue)) {
                             matchCount++;
                         }
                         break;
                     case INT:
-                        int ivalue = ((IntValueHolder)metrics).getValue();
+                        int ivalue = ((IntValueHolder) metrics).getValue();
                         int iexpected = RunningRule.this.threshold.getIntThreshold();
                         if (op.test(iexpected, ivalue)) {
                             matchCount++;
                         }
                         break;
                     case DOUBLE:
-                        double dvalue = ((DoubleValueHolder)metrics).getValue();
+                        double dvalue = ((DoubleValueHolder) metrics).getValue();
                         double dexpected = RunningRule.this.threshold.getDoubleThreshold();
                         if (op.test(dexpected, dvalue)) {
                             matchCount++;
                         }
                         break;
                     case MULTI_INTS:
-                        int[] ivalueArray = ((MultiIntValuesHolder)metrics).getValues();
+                        int[] ivalueArray = ((MultiIntValuesHolder) metrics).getValues();
                         Integer[] iaexpected = RunningRule.this.threshold.getIntValuesThreshold();
                         for (int i = 0; i < ivalueArray.length; i++) {
                             ivalue = ivalueArray[i];