Fix thread unsafe problem in server-alarm-plugin (#4230) (#4247)
* fix thread unsafe problem in server-alarm-plugin (#4230)
* remove redundant #moveTo
Co-authored-by: kezhenxu94 <kezhenxu94@163.com>
Co-authored-by: 吴晟 Wu Sheng <wu.sheng@foxmail.com>
diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java
index 8b6f86c..5fe7471 100644
--- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java
+++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RunningRule.java
@@ -18,19 +18,9 @@
package org.apache.skywalking.oap.server.core.alarm.provider;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm;
-import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.IntValueHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
@@ -39,6 +29,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* RunningRule represents each rule in running status. Based on the {@link AlarmRule} definition,
*
@@ -86,7 +83,7 @@
* Receive metrics result from persistence, after it is saved into storage. In alarm, only minute dimensionality
* metrics are expected to process.
*
- * @param meta of input metrics
+ * @param meta of input metrics
* @param metrics includes the values.
*/
public void in(MetaInAlarm meta, Metrics metrics) {
@@ -130,11 +127,8 @@
Window window = windows.get(meta);
if (window == null) {
window = new Window(period);
- LocalDateTime timeBucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(metrics.getTimeBucket() + "");
- window.moveTo(timeBucket);
windows.put(meta, window);
}
-
window.add(metrics);
}
}
@@ -227,18 +221,17 @@
public void add(Metrics metrics) {
long bucket = metrics.getTimeBucket();
- LocalDateTime timebucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(bucket + "");
+ LocalDateTime timeBucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(bucket + "");
- int minutes = Minutes.minutesBetween(timebucket, endTime).getMinutes();
- if (minutes == -1) {
- this.moveTo(timebucket);
-
- }
-
- lock.lock();
+ this.lock.lock();
try {
+ if (this.endTime == null) {
+ init();
+ this.endTime = timeBucket;
+ }
+ int minutes = Minutes.minutesBetween(timeBucket, this.endTime).getMinutes();
if (minutes < 0) {
- moveTo(timebucket);
+ this.moveTo(timeBucket);
minutes = 0;
}
@@ -248,9 +241,9 @@
return;
}
- values.set(values.size() - minutes - 1, metrics);
+ this.values.set(values.size() - minutes - 1, metrics);
} finally {
- lock.unlock();
+ this.lock.unlock();
}
}
@@ -290,28 +283,28 @@
switch (valueType) {
case LONG:
- long lvalue = ((LongValueHolder)metrics).getValue();
+ long lvalue = ((LongValueHolder) metrics).getValue();
long lexpected = RunningRule.this.threshold.getLongThreshold();
if (op.test(lexpected, lvalue)) {
matchCount++;
}
break;
case INT:
- int ivalue = ((IntValueHolder)metrics).getValue();
+ int ivalue = ((IntValueHolder) metrics).getValue();
int iexpected = RunningRule.this.threshold.getIntThreshold();
if (op.test(iexpected, ivalue)) {
matchCount++;
}
break;
case DOUBLE:
- double dvalue = ((DoubleValueHolder)metrics).getValue();
+ double dvalue = ((DoubleValueHolder) metrics).getValue();
double dexpected = RunningRule.this.threshold.getDoubleThreshold();
if (op.test(dexpected, dvalue)) {
matchCount++;
}
break;
case MULTI_INTS:
- int[] ivalueArray = ((MultiIntValuesHolder)metrics).getValues();
+ int[] ivalueArray = ((MultiIntValuesHolder) metrics).getValues();
Integer[] iaexpected = RunningRule.this.threshold.getIntValuesThreshold();
for (int i = 0; i < ivalueArray.length; i++) {
ivalue = ivalueArray[i];