feat: wrapper sliding window with custom aggregator (#2358)
diff --git a/metrics/util/aggregate/aggregator.go b/metrics/util/aggregate/aggregator.go
new file mode 100644
index 0000000..0f31f3f
--- /dev/null
+++ b/metrics/util/aggregate/aggregator.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+ "math"
+ "sync"
+ "time"
+)
+
+// TimeWindowAggregator wrappers sliding window to aggregate data.
+//
+// It is concurrent-safe.
+// It uses custom struct aggregator to aggregate data.
+// The window is divided into several panes, and each pane's value is an aggregator instance.
+type TimeWindowAggregator struct {
+ window *slidingWindow
+ mux sync.RWMutex
+}
+
+func NewTimeWindowAggregator(paneCount int, timeWindowSeconds int64) *TimeWindowAggregator {
+ return &TimeWindowAggregator{
+ window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
+ }
+}
+
+type Result struct {
+ Total float64
+ Min float64
+ Max float64
+ Avg float64
+ Count uint64
+}
+
+// Result returns the aggregate result of the sliding window by aggregating all panes.
+func (t *TimeWindowAggregator) Result() *Result {
+ t.mux.RLock()
+ defer t.mux.RUnlock()
+
+ res := &Result{}
+
+ total := 0.0
+ count := uint64(0)
+ max := math.SmallestNonzeroFloat64
+ min := math.MaxFloat64
+
+ for _, v := range t.window.values(time.Now().UnixMilli()) {
+ total += v.(*aggregator).total
+ count += v.(*aggregator).count
+ max = math.Max(max, v.(*aggregator).max)
+ min = math.Min(min, v.(*aggregator).min)
+ }
+
+ if count > 0 {
+ res.Avg = total / float64(count)
+ res.Count = count
+ res.Total = total
+ res.Max = max
+ res.Min = min
+ }
+
+ return res
+}
+
+// Add adds a value to the sliding window's current pane.
+func (t *TimeWindowAggregator) Add(v float64) {
+ t.mux.Lock()
+ defer t.mux.Unlock()
+
+ t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*aggregator).add(v)
+}
+
+func (t *TimeWindowAggregator) newEmptyValue() interface{} {
+ return newAggregator()
+}
+
+// aggregator is a custom struct to aggregate data.
+//
+// It is NOT concurrent-safe.
+// It aggregates data by calculating the min, max, total and count.
+type aggregator struct {
+ min float64
+ max float64
+ total float64
+ count uint64
+}
+
+func newAggregator() *aggregator {
+ return &aggregator{
+ min: math.MaxFloat64,
+ max: math.SmallestNonzeroFloat64,
+ total: float64(0),
+ count: uint64(0),
+ }
+}
+
+func (a *aggregator) add(v float64) {
+ a.updateMin(v)
+ a.updateMax(v)
+ a.updateTotal(v)
+ a.updateCount()
+}
+
+func (a *aggregator) updateMin(v float64) {
+ a.min = math.Min(a.min, v)
+}
+
+func (a *aggregator) updateMax(v float64) {
+ a.max = math.Max(a.max, v)
+}
+
+func (a *aggregator) updateTotal(v float64) {
+ a.total += v
+}
+
+func (a *aggregator) updateCount() {
+ a.count++
+}
diff --git a/metrics/util/aggregate/aggregator_test.go b/metrics/util/aggregate/aggregator_test.go
new file mode 100644
index 0000000..d3fa4f6
--- /dev/null
+++ b/metrics/util/aggregate/aggregator_test.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+ "math/rand"
+ "reflect"
+ "sync"
+ "testing"
+)
+
+func TestTimeWindowAggregatorAddAndResult(t *testing.T) {
+ timeWindowAggregator := NewTimeWindowAggregator(10, 1)
+ timeWindowAggregator.Add(10)
+ timeWindowAggregator.Add(20)
+ timeWindowAggregator.Add(30)
+
+ tests := []struct {
+ name string
+ want *Result
+ }{
+ {
+ name: "Result",
+ want: &Result{
+ Total: 60,
+ Min: 10,
+ Max: 30,
+ Avg: 20,
+ Count: 3,
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := timeWindowAggregator.Result(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Result() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func BenchmarkTimeWindowAggregatorAdd(b *testing.B) {
+ wg := sync.WaitGroup{}
+ tw := NewTimeWindowAggregator(10, 1)
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ tw.Add(rand.Float64() * 100)
+ }()
+ }
+ wg.Wait()
+}
+
+func BenchmarkTimeWindowAggregatorResult(b *testing.B) {
+ wg := sync.WaitGroup{}
+ tw := NewTimeWindowAggregator(10, 1)
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ tw.Add(rand.Float64() * 100)
+ }()
+ go func() {
+ defer wg.Done()
+ tw.Result()
+ }()
+ }
+ wg.Wait()
+}
diff --git a/metrics/util/aggregate/quantile.go b/metrics/util/aggregate/quantile.go
index 1a78ba5..c5aa4fd 100644
--- a/metrics/util/aggregate/quantile.go
+++ b/metrics/util/aggregate/quantile.go
@@ -28,6 +28,7 @@
// TimeWindowQuantile wrappers sliding window around T-Digest.
//
+// It is concurrent safe.
// It uses T-Digest algorithm to calculate quantile.
// The window is divided into several panes, and each pane's value is a TDigest instance.
type TimeWindowQuantile struct {
diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go
index feda921..42e2f08 100644
--- a/metrics/util/aggregate/sliding_window.go
+++ b/metrics/util/aggregate/sliding_window.go
@@ -19,7 +19,7 @@
// SlidingWindow adopts sliding window algorithm for statistics.
//
-// It is not thread-safe.
+// It is NOT concurrent-safe.
// A window contains paneCount panes.
// intervalInMs = paneCount * paneIntervalInMs.
type slidingWindow struct {
@@ -61,7 +61,7 @@
return timeMillis-pane.startInMs > s.intervalInMs
}
-// currentPane get the pane at the specified timestamp in milliseconds.
+// currentPane get the pane at the specified timestamp or create a new one if the pane is deprecated.
func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane {
if timeMillis < 0 {
return nil