feat: wrapper sliding window with custom aggregator (#2358)

diff --git a/metrics/util/aggregate/aggregator.go b/metrics/util/aggregate/aggregator.go
new file mode 100644
index 0000000..0f31f3f
--- /dev/null
+++ b/metrics/util/aggregate/aggregator.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+	"math"
+	"sync"
+	"time"
+)
+
+// TimeWindowAggregator wrappers sliding window to aggregate data.
+//
+// It is concurrent-safe.
+// It uses custom struct aggregator to aggregate data.
+// The window is divided into several panes, and each pane's value is an aggregator instance.
+type TimeWindowAggregator struct {
+	window *slidingWindow
+	mux    sync.RWMutex
+}
+
+func NewTimeWindowAggregator(paneCount int, timeWindowSeconds int64) *TimeWindowAggregator {
+	return &TimeWindowAggregator{
+		window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
+	}
+}
+
+type Result struct {
+	Total float64
+	Min   float64
+	Max   float64
+	Avg   float64
+	Count uint64
+}
+
+// Result returns the aggregate result of the sliding window by aggregating all panes.
+func (t *TimeWindowAggregator) Result() *Result {
+	t.mux.RLock()
+	defer t.mux.RUnlock()
+
+	res := &Result{}
+
+	total := 0.0
+	count := uint64(0)
+	max := math.SmallestNonzeroFloat64
+	min := math.MaxFloat64
+
+	for _, v := range t.window.values(time.Now().UnixMilli()) {
+		total += v.(*aggregator).total
+		count += v.(*aggregator).count
+		max = math.Max(max, v.(*aggregator).max)
+		min = math.Min(min, v.(*aggregator).min)
+	}
+
+	if count > 0 {
+		res.Avg = total / float64(count)
+		res.Count = count
+		res.Total = total
+		res.Max = max
+		res.Min = min
+	}
+
+	return res
+}
+
+// Add adds a value to the sliding window's current pane.
+func (t *TimeWindowAggregator) Add(v float64) {
+	t.mux.Lock()
+	defer t.mux.Unlock()
+
+	t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*aggregator).add(v)
+}
+
+func (t *TimeWindowAggregator) newEmptyValue() interface{} {
+	return newAggregator()
+}
+
+// aggregator is a custom struct to aggregate data.
+//
+// It is NOT concurrent-safe.
+// It aggregates data by calculating the min, max, total and count.
+type aggregator struct {
+	min   float64
+	max   float64
+	total float64
+	count uint64
+}
+
+func newAggregator() *aggregator {
+	return &aggregator{
+		min:   math.MaxFloat64,
+		max:   math.SmallestNonzeroFloat64,
+		total: float64(0),
+		count: uint64(0),
+	}
+}
+
+func (a *aggregator) add(v float64) {
+	a.updateMin(v)
+	a.updateMax(v)
+	a.updateTotal(v)
+	a.updateCount()
+}
+
+func (a *aggregator) updateMin(v float64) {
+	a.min = math.Min(a.min, v)
+}
+
+func (a *aggregator) updateMax(v float64) {
+	a.max = math.Max(a.max, v)
+}
+
+func (a *aggregator) updateTotal(v float64) {
+	a.total += v
+}
+
+func (a *aggregator) updateCount() {
+	a.count++
+}
diff --git a/metrics/util/aggregate/aggregator_test.go b/metrics/util/aggregate/aggregator_test.go
new file mode 100644
index 0000000..d3fa4f6
--- /dev/null
+++ b/metrics/util/aggregate/aggregator_test.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+	"math/rand"
+	"reflect"
+	"sync"
+	"testing"
+)
+
+func TestTimeWindowAggregatorAddAndResult(t *testing.T) {
+	timeWindowAggregator := NewTimeWindowAggregator(10, 1)
+	timeWindowAggregator.Add(10)
+	timeWindowAggregator.Add(20)
+	timeWindowAggregator.Add(30)
+
+	tests := []struct {
+		name string
+		want *Result
+	}{
+		{
+			name: "Result",
+			want: &Result{
+				Total: 60,
+				Min:   10,
+				Max:   30,
+				Avg:   20,
+				Count: 3,
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := timeWindowAggregator.Result(); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("Result() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func BenchmarkTimeWindowAggregatorAdd(b *testing.B) {
+	wg := sync.WaitGroup{}
+	tw := NewTimeWindowAggregator(10, 1)
+	for i := 0; i < b.N; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			tw.Add(rand.Float64() * 100)
+		}()
+	}
+	wg.Wait()
+}
+
+func BenchmarkTimeWindowAggregatorResult(b *testing.B) {
+	wg := sync.WaitGroup{}
+	tw := NewTimeWindowAggregator(10, 1)
+	for i := 0; i < b.N; i++ {
+		wg.Add(1)
+		go func() {
+			tw.Add(rand.Float64() * 100)
+		}()
+		go func() {
+			defer wg.Done()
+			tw.Result()
+		}()
+	}
+	wg.Wait()
+}
diff --git a/metrics/util/aggregate/quantile.go b/metrics/util/aggregate/quantile.go
index 1a78ba5..c5aa4fd 100644
--- a/metrics/util/aggregate/quantile.go
+++ b/metrics/util/aggregate/quantile.go
@@ -28,6 +28,7 @@
 
 // TimeWindowQuantile wrappers sliding window around T-Digest.
 //
+// It is concurrent safe.
 // It uses T-Digest algorithm to calculate quantile.
 // The window is divided into several panes, and each pane's value is a TDigest instance.
 type TimeWindowQuantile struct {
diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go
index feda921..42e2f08 100644
--- a/metrics/util/aggregate/sliding_window.go
+++ b/metrics/util/aggregate/sliding_window.go
@@ -19,7 +19,7 @@
 
 // SlidingWindow adopts sliding window algorithm for statistics.
 //
-// It is not thread-safe.
+// It is NOT concurrent-safe.
 // A window contains paneCount panes.
 // intervalInMs = paneCount * paneIntervalInMs.
 type slidingWindow struct {
@@ -61,7 +61,7 @@
 	return timeMillis-pane.startInMs > s.intervalInMs
 }
 
-// currentPane get the pane at the specified timestamp in milliseconds.
+// currentPane get the pane at the specified timestamp or create a new one if the pane is deprecated.
 func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane {
 	if timeMillis < 0 {
 		return nil