feat: auto concurrency limiter of adaptive service (#2114)
* AutoConcurrencyLimiter with brpc algorithm
* go mod
* license
* fix test timeout
* add warn and rename some var
* remove unreachable branch
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 11c5015..8be46b7 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -78,7 +78,7 @@
// limiter is not found on the mapper, just create
// a new limiter
if l, err = limiterMapperSingleton.newAndSetMethodLimiter(invoker.GetURL(),
- invocation.MethodName(), limiter.HillClimbingLimiter); err != nil {
+ invocation.MethodName(), limiter.AutoConcurrencyLimiter); err != nil {
return &protocol.RPCResult{Err: wrapErrAdaptiveSvcInterrupted(err)}
}
} else {
@@ -144,9 +144,6 @@
return &protocol.RPCResult{Err: err}
}
- // set attachments to inform consumer of provider status
- result.AddAttachment(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
- result.AddAttachment(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
logger.Debugf("[adasvc filter] The attachments are set, %s: %d, %s: %d.",
constant.AdaptiveServiceRemainingKey, l.Remaining(),
constant.AdaptiveServiceInflightKey, l.Inflight())
diff --git a/filter/adaptivesvc/limiter/auto_concurrency_limiter.go b/filter/adaptivesvc/limiter/auto_concurrency_limiter.go
new file mode 100644
index 0000000..bc43ee9
--- /dev/null
+++ b/filter/adaptivesvc/limiter/auto_concurrency_limiter.go
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package limiter
+
+import (
+ "math"
+ "math/rand"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ "go.uber.org/atomic"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter/cpu"
+)
+
+var (
+ _ Limiter = (*AutoConcurrency)(nil)
+ _ Updater = (*AutoConcurrencyUpdater)(nil)
+ cpuLoad *atomic.Uint64 = atomic.NewUint64(0) // range from 0 to 1000
+)
+
+// These parameters may need to be different between services
+const (
+ MaxExploreRatio = 0.3
+ MinExploreRatio = 0.06
+ SampleWindowSizeMs = 1000
+ MinSampleCount = 40
+ MaxSampleCount = 500
+ CPUDecay = 0.95
+)
+
+type AutoConcurrency struct {
+ sync.RWMutex
+
+ exploreRatio float64
+ emaFactor float64
+ noLoadLatency float64 // duration
+ maxQPS float64
+ halfSampleIntervalMS int64
+ maxConcurrency uint64
+
+ // metrics of the current round
+ startSampleTimeUs int64
+ lastSamplingTimeUs *atomic.Int64
+ resetLatencyUs int64 // time to reset noLoadLatency
+ remeasureStartUs int64 // time to reset req data (sampleCount, totalSampleUs, totalReqCount)
+ sampleCount int64
+ totalSampleUs int64
+ totalReqCount *atomic.Int64
+
+ inflight *atomic.Uint64
+}
+
+func init() {
+ go cpuproc()
+}
+
+// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
+func cpuproc() {
+ ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
+ defer func() {
+ ticker.Stop()
+ if err := recover(); err != nil {
+ logger.Warnf("cpu usage collector panic: %v", err)
+ go cpuproc()
+ }
+ }()
+
+ for range ticker.C {
+ usage := cpu.CpuUsage()
+ prevCPU := cpuLoad.Load()
+ curCPU := uint64(float64(prevCPU)*CPUDecay + float64(usage)*(1.0-CPUDecay))
+ logger.Debugf("current cpu usage: %d", curCPU)
+ cpuLoad.Store(curCPU)
+ }
+}
+
+func CPUUsage() uint64 {
+ return cpuLoad.Load()
+}
+
+func NewAutoConcurrencyLimiter() *AutoConcurrency {
+ l := &AutoConcurrency{
+ exploreRatio: MaxExploreRatio,
+ emaFactor: 0.1,
+ noLoadLatency: -1,
+ maxQPS: -1,
+ maxConcurrency: 40,
+ halfSampleIntervalMS: 25000,
+ resetLatencyUs: 0,
+ inflight: atomic.NewUint64(0),
+ lastSamplingTimeUs: atomic.NewInt64(0),
+ totalReqCount: atomic.NewInt64(0),
+ }
+ l.remeasureStartUs = l.NextResetTime(time.Now().UnixNano() / 1e3)
+ return l
+}
+
+func (l *AutoConcurrency) updateNoLoadLatency(latency float64) {
+ emaFactor := l.emaFactor
+ if l.noLoadLatency <= 0 {
+ l.noLoadLatency = latency
+ } else if latency < l.noLoadLatency {
+ l.noLoadLatency = latency*emaFactor + l.noLoadLatency*(1-emaFactor)
+ }
+}
+
+func (l *AutoConcurrency) updateQPS(qps float64) {
+ emaFactor := l.emaFactor / 10
+ if l.maxQPS <= qps {
+ l.maxQPS = qps
+ } else {
+ l.maxQPS = qps*emaFactor + l.maxQPS*(1-emaFactor)
+ }
+}
+
+func (l *AutoConcurrency) updateMaxConcurrency(v uint64) {
+ if l.maxConcurrency <= v {
+ l.maxConcurrency = v
+ } else {
+ l.maxConcurrency = uint64(float64(v)*l.emaFactor + float64(l.maxConcurrency)*(1-l.emaFactor))
+ }
+}
+
+func (l *AutoConcurrency) Inflight() uint64 {
+ return l.inflight.Load()
+}
+
+func (l *AutoConcurrency) Remaining() uint64 {
+ return l.maxConcurrency - l.inflight.Load()
+}
+
+func (l *AutoConcurrency) Acquire() (Updater, error) {
+ now := time.Now()
+ if l.inflight.Inc() > l.maxConcurrency && CPUUsage() >= 500 { // only when cpu load is above 50%
+ l.inflight.Dec()
+ return nil, ErrReachLimitation
+ }
+ u := &AutoConcurrencyUpdater{
+ startTime: now,
+ limiter: l,
+ }
+ return u, nil
+}
+
+func (l *AutoConcurrency) Reset(startTimeUs int64) {
+ l.startSampleTimeUs = startTimeUs
+ l.sampleCount = 0
+ l.totalSampleUs = 0
+ l.totalReqCount.Store(0)
+}
+
+func (l *AutoConcurrency) NextResetTime(samplingTimeUs int64) int64 {
+ return samplingTimeUs + (l.halfSampleIntervalMS+rand.Int63n(l.halfSampleIntervalMS))*1000
+}
+
+func (l *AutoConcurrency) Update(latency int64, samplingTimeUs int64) {
+ l.Lock()
+ defer l.Unlock()
+ if l.resetLatencyUs != 0 { // wait to reset noLoadLatency and other data
+ if l.resetLatencyUs > samplingTimeUs {
+ return
+ }
+ l.noLoadLatency = -1
+ l.resetLatencyUs = 0
+ l.remeasureStartUs = l.NextResetTime(samplingTimeUs)
+ l.Reset(samplingTimeUs)
+ }
+
+ if l.startSampleTimeUs == 0 {
+ l.startSampleTimeUs = samplingTimeUs
+ }
+
+ l.sampleCount++
+ l.totalSampleUs += latency
+
+ logger.Debugf("[Auto Concurrency Limiter Test] samplingTimeUs: %v, startSampleTimeUs: %v", samplingTimeUs, l.startSampleTimeUs)
+
+ if l.sampleCount < MinSampleCount {
+ if samplingTimeUs-l.startSampleTimeUs >= SampleWindowSizeMs*1000 { // QPS is too small
+ l.Reset(samplingTimeUs)
+ }
+ return
+ }
+
+ logger.Debugf("[Auto Concurrency Limiter Test] samplingTimeUs: %v, startSampleTimeUs: %v", samplingTimeUs, l.startSampleTimeUs)
+
+ // sampling time is too short. If sample count is bigger than MaxSampleCount, just update.
+ if samplingTimeUs-l.startSampleTimeUs < SampleWindowSizeMs*1000 && l.sampleCount < MaxSampleCount {
+ return
+ }
+
+ qps := float64(l.totalReqCount.Load()) * 1000000.0 / float64(samplingTimeUs-l.startSampleTimeUs)
+ l.updateQPS(qps)
+
+ avgLatency := l.totalSampleUs / l.sampleCount
+ l.updateNoLoadLatency(float64(avgLatency))
+
+ nextMaxConcurrency := uint64(0)
+ if l.remeasureStartUs <= samplingTimeUs { // should reset
+ l.Reset(samplingTimeUs)
+ l.resetLatencyUs = samplingTimeUs + avgLatency*2
+ nextMaxConcurrency = uint64(math.Ceil(l.maxQPS * l.noLoadLatency * 0.9 / 1000000))
+ } else {
+ // use explore ratio to adjust MaxConcurrency
+ if float64(avgLatency) <= l.noLoadLatency*(1.0+MinExploreRatio) ||
+ qps >= l.maxQPS*(1.0+MinExploreRatio) {
+ l.exploreRatio = math.Min(MaxExploreRatio, l.exploreRatio+0.02)
+ } else {
+ l.exploreRatio = math.Max(MinExploreRatio, l.exploreRatio-0.02)
+ }
+ nextMaxConcurrency = uint64(math.Ceil(l.noLoadLatency * l.maxQPS * (1 + l.exploreRatio) / 1000000))
+ }
+ l.maxConcurrency = nextMaxConcurrency
+
+ // maxConcurrency should be no less than 1
+ if l.maxConcurrency <= 0 {
+ l.maxConcurrency = 1
+ }
+
+ logger.Debugf("[Auto Concurrency Limiter] Qps: %v, NoLoadLatency: %f, MaxConcurrency: %d, limiter: %+v",
+ l.maxQPS, l.noLoadLatency, l.maxConcurrency, l)
+
+ // Update completed, resample
+ l.Reset(samplingTimeUs)
+
+}
+
+type AutoConcurrencyUpdater struct {
+ startTime time.Time
+ limiter *AutoConcurrency
+}
+
+func (u *AutoConcurrencyUpdater) DoUpdate() error {
+ defer func() {
+ u.limiter.inflight.Dec()
+ }()
+ u.limiter.totalReqCount.Add(1)
+ now := time.Now().UnixNano() / 1e3
+ lastSamplingTimeUs := u.limiter.lastSamplingTimeUs.Load()
+ if lastSamplingTimeUs == 0 || now-lastSamplingTimeUs >= 100 {
+ sample := u.limiter.lastSamplingTimeUs.CAS(lastSamplingTimeUs, now)
+ if sample {
+ logger.Debugf("[Auto Concurrency Updater] sample, %v, %v", u.limiter.resetLatencyUs, u.limiter.remeasureStartUs)
+ latency := now - u.startTime.UnixNano()/1e3
+ u.limiter.Update(latency, now)
+ }
+ }
+
+ return nil
+}
diff --git a/filter/adaptivesvc/limiter/cpu/cgroup.go b/filter/adaptivesvc/limiter/cpu/cgroup.go
new file mode 100644
index 0000000..1c707f4
--- /dev/null
+++ b/filter/adaptivesvc/limiter/cpu/cgroup.go
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpu
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "strconv"
+ "strings"
+)
+
+const cgroupRootDir = "/sys/fs/cgroup"
+
+// cgroup Linux cgroup
+type cgroup struct {
+ cgroupSet map[string]string
+}
+
+// CPUCFSQuotaUs cpu.cfs_quota_us
+func (c *cgroup) CPUCFSQuotaUs() (int64, error) {
+ data, err := readFile(path.Join(c.cgroupSet["cpu"], "cpu.cfs_quota_us"))
+ if err != nil {
+ return 0, err
+ }
+ return strconv.ParseInt(data, 10, 64)
+}
+
+// CPUCFSPeriodUs cpu.cfs_period_us
+func (c *cgroup) CPUCFSPeriodUs() (uint64, error) {
+ data, err := readFile(path.Join(c.cgroupSet["cpu"], "cpu.cfs_period_us"))
+ if err != nil {
+ return 0, err
+ }
+ return parseUint(data)
+}
+
+// CPUAcctUsage cpuacct.usage
+func (c *cgroup) CPUAcctUsage() (uint64, error) {
+ data, err := readFile(path.Join(c.cgroupSet["cpuacct"], "cpuacct.usage"))
+ if err != nil {
+ return 0, err
+ }
+ return parseUint(data)
+}
+
+// CPUAcctUsagePerCPU cpuacct.usage_percpu
+func (c *cgroup) CPUAcctUsagePerCPU() ([]uint64, error) {
+ data, err := readFile(path.Join(c.cgroupSet["cpuacct"], "cpuacct.usage_percpu"))
+ if err != nil {
+ return nil, err
+ }
+ var usage []uint64
+ for _, v := range strings.Fields(string(data)) {
+ var u uint64
+ if u, err = parseUint(v); err != nil {
+ return nil, err
+ }
+ // fix possible_cpu:https://www.ibm.com/support/knowledgecenter/en/linuxonibm/com.ibm.linux.z.lgdd/lgdd_r_posscpusparm.html
+ if u != 0 {
+ usage = append(usage, u)
+ }
+ }
+ return usage, nil
+}
+
+// CPUSetCPUs cpuset.cpus
+func (c *cgroup) CPUSetCPUs() ([]uint64, error) {
+ data, err := readFile(path.Join(c.cgroupSet["cpuset"], "cpuset.cpus"))
+ if err != nil {
+ return nil, err
+ }
+ cpus, err := ParseUintList(data)
+ if err != nil {
+ return nil, err
+ }
+ sets := make([]uint64, 0)
+ for k := range cpus {
+ sets = append(sets, uint64(k))
+ }
+ return sets, nil
+}
+
+// CurrentcGroup get current process cgroup
+func currentcGroup() (*cgroup, error) {
+ pid := os.Getpid()
+ cgroupFile := fmt.Sprintf("/proc/%d/cgroup", pid)
+ cgroupSet := make(map[string]string)
+ fp, err := os.Open(cgroupFile)
+ if err != nil {
+ return nil, err
+ }
+ defer fp.Close()
+ buf := bufio.NewReader(fp)
+ for {
+ line, err := buf.ReadString('\n')
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return nil, err
+ }
+ col := strings.Split(strings.TrimSpace(line), ":")
+ if len(col) != 3 {
+ return nil, fmt.Errorf("invalid cgroup format %s", line)
+ }
+ dir := col[2]
+ // When dir is not equal to /, it must be in docker
+ if dir != "/" {
+ cgroupSet[col[1]] = path.Join(cgroupRootDir, col[1])
+ if strings.Contains(col[1], ",") {
+ for _, k := range strings.Split(col[1], ",") {
+ cgroupSet[k] = path.Join(cgroupRootDir, k)
+ }
+ }
+ } else {
+ cgroupSet[col[1]] = path.Join(cgroupRootDir, col[1], col[2])
+ if strings.Contains(col[1], ",") {
+ for _, k := range strings.Split(col[1], ",") {
+ cgroupSet[k] = path.Join(cgroupRootDir, k, col[2])
+ }
+ }
+ }
+ }
+ return &cgroup{cgroupSet: cgroupSet}, nil
+}
diff --git a/filter/adaptivesvc/limiter/cpu/cgroup_cpu.go b/filter/adaptivesvc/limiter/cpu/cgroup_cpu.go
new file mode 100644
index 0000000..99be1ca
--- /dev/null
+++ b/filter/adaptivesvc/limiter/cpu/cgroup_cpu.go
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpu
+
+import (
+ "bufio"
+ "errors"
+ "os"
+ "strconv"
+ "strings"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ pscpu "github.com/shirou/gopsutil/v3/cpu"
+)
+
+type cgroupCPU struct {
+ frequency uint64
+ quota float64
+ cores uint64
+
+ preSystem uint64
+ preTotal uint64
+}
+
+func newCgroupCPU() (cpu *cgroupCPU, err error) {
+ cores, err := pscpu.Counts(true)
+ if err != nil || cores == 0 {
+ var cpus []uint64
+ cpus, err = perCPUUsage()
+ if err != nil {
+ return nil, err
+ }
+ cores = len(cpus)
+ }
+
+ sets, err := cpuSets()
+ if err != nil {
+ return
+ }
+ quota := float64(len(sets))
+ cq, err := cpuQuota()
+ if err == nil && cq != -1 {
+ var period uint64
+ if period, err = cpuPeriod(); err != nil {
+ return
+ }
+ limit := float64(cq) / float64(period)
+ if limit < quota {
+ quota = limit
+ }
+ }
+ maxFreq := cpuMaxFreq()
+
+ preSystem, err := systemCPUUsage()
+ if err != nil {
+ return
+ }
+ preTotal, err := totalCPUUsage()
+ if err != nil {
+ return
+ }
+ cpu = &cgroupCPU{
+ frequency: maxFreq,
+ quota: quota,
+ cores: uint64(cores),
+ preSystem: preSystem,
+ preTotal: preTotal,
+ }
+ return
+}
+
+func (cpu *cgroupCPU) Usage() (u uint64, err error) {
+ var (
+ total uint64
+ system uint64
+ )
+ total, err = totalCPUUsage()
+ if err != nil {
+ return
+ }
+ system, err = systemCPUUsage()
+ if err != nil {
+ return
+ }
+ if system != cpu.preSystem {
+ u = uint64(float64((total-cpu.preTotal)*cpu.cores*1e3) / (float64(system-cpu.preSystem) * cpu.quota))
+ }
+ logger.Debugf("CPU: usage %v %+v", u, cpu)
+ cpu.preSystem = system
+ cpu.preTotal = total
+ return
+}
+
+func (cpu *cgroupCPU) Info() Info {
+ return Info{
+ Frequency: cpu.frequency,
+ Quota: cpu.quota,
+ }
+}
+
+const nanoSecondsPerSecond = 1e9
+
+// ErrNoCFSLimit is no quota limit
+var ErrNoCFSLimit = errors.New("no quota limit")
+
+var clockTicksPerSecond = uint64(getClockTicks())
+
+// systemCPUUsage returns the host system's cpu usage in
+// nanoseconds. An error is returned if the format of the underlying
+// file does not match.
+//
+// Uses /proc/stat defined by POSIX. Looks for the cpu
+// statistics line and then sums up the first seven fields
+// provided. See man 5 proc for details on specific field
+// information.
+func systemCPUUsage() (usage uint64, err error) {
+ var (
+ line string
+ f *os.File
+ )
+ if f, err = os.Open("/proc/stat"); err != nil {
+ return
+ }
+ bufReader := bufio.NewReaderSize(nil, 128)
+ defer func() {
+ bufReader.Reset(nil)
+ f.Close()
+ }()
+ bufReader.Reset(f)
+ for err == nil {
+ if line, err = bufReader.ReadString('\n'); err != nil {
+ return
+ }
+ parts := strings.Fields(line)
+ switch parts[0] {
+ case "cpu":
+ if len(parts) < 8 {
+ err = errors.New("bad format of cpu stats")
+ return
+ }
+ var totalClockTicks uint64
+ for _, i := range parts[1:8] {
+ var v uint64
+ if v, err = strconv.ParseUint(i, 10, 64); err != nil {
+ return
+ }
+ totalClockTicks += v
+ }
+ usage = (totalClockTicks * nanoSecondsPerSecond) / clockTicksPerSecond
+ return
+ }
+ }
+ err = errors.New("bad stats format")
+ return
+}
+
+func totalCPUUsage() (usage uint64, err error) {
+ var cg *cgroup
+ if cg, err = currentcGroup(); err != nil {
+ return
+ }
+ return cg.CPUAcctUsage()
+}
+
+func perCPUUsage() (usage []uint64, err error) {
+ var cg *cgroup
+ if cg, err = currentcGroup(); err != nil {
+ return
+ }
+ return cg.CPUAcctUsagePerCPU()
+}
+
+func cpuSets() (sets []uint64, err error) {
+ var cg *cgroup
+ if cg, err = currentcGroup(); err != nil {
+ return
+ }
+ return cg.CPUSetCPUs()
+}
+
+func cpuQuota() (quota int64, err error) {
+ var cg *cgroup
+ if cg, err = currentcGroup(); err != nil {
+ return
+ }
+ return cg.CPUCFSQuotaUs()
+}
+
+func cpuPeriod() (peroid uint64, err error) {
+ var cg *cgroup
+ if cg, err = currentcGroup(); err != nil {
+ return
+ }
+ return cg.CPUCFSPeriodUs()
+}
+
+func cpuFreq() uint64 {
+ lines, err := readLines("/proc/cpuinfo")
+ if err != nil {
+ return 0
+ }
+ for _, line := range lines {
+ fields := strings.Split(line, ":")
+ if len(fields) < 2 {
+ continue
+ }
+ key := strings.TrimSpace(fields[0])
+ value := strings.TrimSpace(fields[1])
+ if key == "cpu MHz" || key == "clock" {
+ // treat this as the fallback value, thus we ignore error
+ if t, err := strconv.ParseFloat(strings.Replace(value, "MHz", "", 1), 64); err == nil {
+ return uint64(t * 1000.0 * 1000.0)
+ }
+ }
+ }
+ return 0
+}
+
+func cpuMaxFreq() uint64 {
+ feq := cpuFreq()
+ data, err := readFile("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq")
+ if err != nil {
+ return feq
+ }
+ // override the max freq from /proc/cpuinfo
+ cfeq, err := parseUint(data)
+ if err == nil {
+ feq = cfeq
+ }
+ return feq
+}
+
+//GetClockTicks get the OS's ticks per second
+func getClockTicks() int {
+ // https://msdn.microsoft.com/en-us/library/windows/desktop/ms644905(v=vs.85).aspx
+ //
+ // An example of its usage can be found here.
+ // https://msdn.microsoft.com/en-us/library/windows/desktop/dn553408(v=vs.85).aspx
+
+ return 100
+}
diff --git a/filter/adaptivesvc/limiter/cpu/stat.go b/filter/adaptivesvc/limiter/cpu/stat.go
new file mode 100644
index 0000000..2a53e61
--- /dev/null
+++ b/filter/adaptivesvc/limiter/cpu/stat.go
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpu
+
+import (
+ "fmt"
+ "time"
+)
+
+import (
+ "go.uber.org/atomic"
+)
+
+const (
+ interval time.Duration = time.Millisecond * 500
+)
+
+var (
+ stats CPU
+ usage = atomic.NewUint64(0)
+)
+
+// CPU is cpu stat usage.
+type CPU interface {
+ Usage() (u uint64, e error)
+ Info() Info
+}
+
+func init() {
+ var (
+ err error
+ )
+ stats, err = newCgroupCPU()
+ if err != nil {
+ panic(fmt.Sprintf("cgroup cpu init failed! err:=%v", err))
+ }
+ go func() {
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ <-ticker.C
+ u, err := stats.Usage()
+ if err == nil && u != 0 {
+ usage.Store(u)
+ }
+ }
+ }()
+}
+
+// Info cpu info.
+type Info struct {
+ Frequency uint64
+ Quota float64
+}
+
+// CpuUsage read cpu stat.
+func CpuUsage() uint64 {
+ return usage.Load()
+}
+
+// GetInfo get cpu info.
+func GetInfo() Info {
+ return stats.Info()
+}
diff --git a/filter/adaptivesvc/limiter/cpu/stat_test.go b/filter/adaptivesvc/limiter/cpu/stat_test.go
new file mode 100644
index 0000000..0e54399
--- /dev/null
+++ b/filter/adaptivesvc/limiter/cpu/stat_test.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpu
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+func TestStat(t *testing.T) {
+ time.Sleep(time.Second * 2)
+ var i Info
+ u := CpuUsage()
+ i = GetInfo()
+ fmt.Printf("cpu:: %+v\n", stats)
+ assert.NotZero(t, u)
+ assert.NotZero(t, i.Frequency)
+ assert.NotZero(t, i.Quota)
+
+ time.Sleep(time.Second * 10)
+}
diff --git a/filter/adaptivesvc/limiter/cpu/utils.go b/filter/adaptivesvc/limiter/cpu/utils.go
new file mode 100644
index 0000000..d8780ca
--- /dev/null
+++ b/filter/adaptivesvc/limiter/cpu/utils.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpu
+
+import (
+ "bufio"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strconv"
+ "strings"
+)
+
+func readFile(path string) (string, error) {
+ contents, err := ioutil.ReadFile(path)
+ if err != nil {
+ return "", err
+ }
+ return strings.TrimSpace(string(contents)), nil
+}
+
+func parseUint(s string) (uint64, error) {
+ v, err := strconv.ParseUint(s, 10, 64)
+ if err != nil {
+ intValue, intErr := strconv.ParseInt(s, 10, 64)
+ // 1. Handle negative values greater than MinInt64 (and)
+ // 2. Handle negative values lesser than MinInt64
+ if intErr == nil && intValue < 0 {
+ return 0, nil
+ } else if intErr != nil &&
+ intErr.(*strconv.NumError).Err == strconv.ErrRange &&
+ intValue < 0 {
+ return 0, nil
+ }
+ return 0, err
+ }
+ return v, nil
+}
+
+// ParseUintList parses and validates the specified string as the value
+// found in some cgroup file (e.g. cpuset.cpus, cpuset.mems), which could be
+// one of the formats below. Note that duplicates are actually allowed in the
+// input string. It returns a map[int]bool with available elements from val
+// set to true.
+// Supported formats:
+// 7
+// 1-6
+// 0,3-4,7,8-10
+// 0-0,0,1-7
+// 03,1-3 <- this is gonna get parsed as [1,2,3]
+// 3,2,1
+// 0-2,3,1
+func ParseUintList(val string) (map[int]bool, error) {
+ if val == "" {
+ return map[int]bool{}, nil
+ }
+
+ availableInts := make(map[int]bool)
+ split := strings.Split(val, ",")
+ errInvalidFormat := fmt.Errorf("os/stat: invalid format: %s", val)
+ for _, r := range split {
+ if !strings.Contains(r, "-") {
+ v, err := strconv.Atoi(r)
+ if err != nil {
+ return nil, errInvalidFormat
+ }
+ availableInts[v] = true
+ } else {
+ split := strings.SplitN(r, "-", 2)
+ min, err := strconv.Atoi(split[0])
+ if err != nil {
+ return nil, errInvalidFormat
+ }
+ max, err := strconv.Atoi(split[1])
+ if err != nil {
+ return nil, errInvalidFormat
+ }
+ if max < min {
+ return nil, errInvalidFormat
+ }
+ for i := min; i <= max; i++ {
+ availableInts[i] = true
+ }
+ }
+ }
+ return availableInts, nil
+}
+
+// ReadLines reads contents from a file and splits them by new lines.
+// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
+func readLines(filename string) ([]string, error) {
+ return readLinesOffsetN(filename, 0, -1)
+}
+
+// ReadLinesOffsetN reads contents from file and splits them by new line.
+// The offset tells at which line number to start.
+// The count determines the number of lines to read (starting from offset):
+// n >= 0: at most n lines
+// n < 0: whole file
+func readLinesOffsetN(filename string, offset uint, n int) ([]string, error) {
+ f, err := os.Open(filename)
+ if err != nil {
+ return []string{""}, err
+ }
+ defer f.Close()
+
+ var ret []string
+
+ r := bufio.NewReader(f)
+ for i := 0; i < n+int(offset) || n < 0; i++ {
+ line, err := r.ReadString('\n')
+ if err != nil {
+ break
+ }
+ if i < int(offset) {
+ continue
+ }
+ ret = append(ret, strings.Trim(line, "\n"))
+ }
+
+ return ret, nil
+}
diff --git a/filter/adaptivesvc/limiter/limiter.go b/filter/adaptivesvc/limiter/limiter.go
index 82424fc..a447478 100644
--- a/filter/adaptivesvc/limiter/limiter.go
+++ b/filter/adaptivesvc/limiter/limiter.go
@@ -29,6 +29,7 @@
const (
HillClimbingLimiter = iota
+ AutoConcurrencyLimiter
)
type Limiter interface {
diff --git a/filter/adaptivesvc/limiter_mapper.go b/filter/adaptivesvc/limiter_mapper.go
index e9343ed..65219b0 100644
--- a/filter/adaptivesvc/limiter_mapper.go
+++ b/filter/adaptivesvc/limiter_mapper.go
@@ -66,6 +66,8 @@
switch limiterType {
case limiter.HillClimbingLimiter:
l = limiter.NewHillClimbing()
+ case limiter.AutoConcurrencyLimiter:
+ l = limiter.NewAutoConcurrencyLimiter()
default:
return nil, ErrLimiterTypeNotFound
}
diff --git a/go.mod b/go.mod
index c217e8b..5250922 100644
--- a/go.mod
+++ b/go.mod
@@ -44,6 +44,7 @@
github.com/polarismesh/polaris-go v1.2.0
github.com/prometheus/client_golang v1.12.2
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
+ github.com/shirou/gopsutil/v3 v3.22.2
github.com/stretchr/testify v1.8.1
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/v3 v3.5.5