Add mutex to fix some data race (#227)
diff --git a/.github/workflows/plugin-tests.yaml b/.github/workflows/plugin-tests.yaml
index 40801e5..107e83c 100644
--- a/.github/workflows/plugin-tests.yaml
+++ b/.github/workflows/plugin-tests.yaml
@@ -80,7 +80,7 @@
- go-redisv9
- go-restfulv3
- gorm
- - kratosv2
+# - kratosv2 temporary disable because it's not stable
- microv4
- mongo
- mysql
diff --git a/CHANGES.md b/CHANGES.md
index 29b9108..ba08555 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@
* Support Windows plugin test.
* Support Kafka reporter.
* Add recover to goroutine to prevent unexpected panics.
+* Add mutex to fix some data race.
#### Plugins
diff --git a/plugins/core/context.go b/plugins/core/context.go
index fe1c561..9721c27 100644
--- a/plugins/core/context.go
+++ b/plugins/core/context.go
@@ -17,7 +17,10 @@
package core
-import "reflect"
+import (
+ "reflect"
+ "sync"
+)
var (
GetGLS = func() interface{} { return nil }
@@ -29,16 +32,21 @@
)
type ContextSnapshoter interface {
- TakeSnapShot(val interface{}) interface{}
+ TakeSnapShot() interface{}
}
type TracingContext struct {
activeSpan TracingSpan
Runtime *RuntimeContext
ID *IDContext
+
+ activeSpanLock sync.RWMutex
}
-func (t *TracingContext) TakeSnapShot(val interface{}) interface{} {
+func (t *TracingContext) TakeSnapShot() interface{} {
+ if t == nil {
+ return nil
+ }
snapshot := newSnapshotSpan(t.ActiveSpan())
return &TracingContext{
activeSpan: snapshot,
@@ -48,6 +56,8 @@
}
func (t *TracingContext) ActiveSpan() TracingSpan {
+ t.activeSpanLock.RLock()
+ defer t.activeSpanLock.RUnlock()
if t.activeSpan == nil || reflect.ValueOf(t.activeSpan).IsZero() {
return nil
}
@@ -55,6 +65,8 @@
}
func (t *TracingContext) SaveActiveSpan(s TracingSpan) {
+ t.activeSpanLock.Lock()
+ defer t.activeSpanLock.Unlock()
t.activeSpan = s
}
diff --git a/plugins/core/metrics.go b/plugins/core/metrics.go
index 5e0fd8a..ddac85c 100644
--- a/plugins/core/metrics.go
+++ b/plugins/core/metrics.go
@@ -75,7 +75,7 @@
func (t *Tracer) sendMetrics() {
meters := make([]reporter.ReportedMeter, 0)
// call collect hook
- for _, hook := range t.meterCollectListeners {
+ for _, hook := range t.allMeterCollectListeners() {
hook()
}
t.meterMap.Range(func(key, value interface{}) bool {
@@ -90,6 +90,14 @@
t.Reporter.SendMetrics(meters)
}
+func (t *Tracer) allMeterCollectListeners() []func() {
+ t.meterCollectListenersLock.RLock()
+ defer t.meterCollectListenersLock.RUnlock()
+ listeners := make([]func(), 0, len(t.meterCollectListeners))
+ listeners = append(listeners, t.meterCollectListeners...)
+ return listeners
+}
+
func (t *Tracer) NewCounter(name string, opt interface{}) interface{} {
counter := newCounter(name, nil, 0)
if o, ok := opt.(meterOpts); ok && o != nil {
@@ -118,6 +126,8 @@
}
func (t *Tracer) AddCollectHook(f func()) {
+ t.meterCollectListenersLock.Lock()
+ defer t.meterCollectListenersLock.Unlock()
t.meterCollectListeners = append(t.meterCollectListeners, f)
}
@@ -347,7 +357,7 @@
}
func (h *histogramBucket) Count() int64 {
- return *h.value
+ return atomic.LoadInt64(h.value)
}
func (h *histogramBucket) IsNegativeInfinity() bool {
diff --git a/plugins/core/operator/invocation.go b/plugins/core/operator/invocation.go
index 8241ca8..83fee06 100644
--- a/plugins/core/operator/invocation.go
+++ b/plugins/core/operator/invocation.go
@@ -27,6 +27,10 @@
returnValues []interface{}
context interface{}
+
+ // self obs
+ interTimeCost int64
+ beforeInterStart int64
}
func (i *realInvocation) CallerInstance() interface{} {
diff --git a/plugins/core/sampler.go b/plugins/core/sampler.go
index c9fb6ba..282f288 100644
--- a/plugins/core/sampler.go
+++ b/plugins/core/sampler.go
@@ -107,10 +107,13 @@
currentRate float64
defaultRate float64
sampler Sampler
+ locker sync.RWMutex
}
// IsSampled implements IsSampled() of Sampler.
func (s *DynamicSampler) IsSampled(operation string) bool {
+ s.locker.RLock()
+ defer s.locker.RUnlock()
return s.sampler.IsSampled(operation)
}
@@ -136,11 +139,15 @@
} else {
sampler = NewRandomSampler(samplingRate)
}
+ s.locker.Lock()
+ defer s.locker.Unlock()
s.sampler = sampler
s.currentRate = samplingRate
}
func (s *DynamicSampler) Value() string {
+ s.locker.RLock()
+ defer s.locker.RUnlock()
return fmt.Sprintf("%f", s.currentRate)
}
diff --git a/plugins/core/span_tracing.go b/plugins/core/span_tracing.go
index 4a4b6ac..87edd7c 100644
--- a/plugins/core/span_tracing.go
+++ b/plugins/core/span_tracing.go
@@ -255,12 +255,12 @@
}
func (rs *RootSegmentSpan) end0() {
- go func() {
- defer func() {
- _ = recover()
- }()
- rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
+ defer func() {
+ _ = recover()
}()
+ if rs != nil && rs.doneCh != nil && rs.SegmentContext.refNum != nil {
+ rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
+ }
}
func (rs *RootSegmentSpan) createRootSegmentContext(ctx *TracingContext, _ SegmentSpan) (err error) {
diff --git a/plugins/core/test_base.go b/plugins/core/test_base.go
index 5fa2ae3..749a653 100644
--- a/plugins/core/test_base.go
+++ b/plugins/core/test_base.go
@@ -57,7 +57,7 @@
return
}
if e := gls.(ContextSnapshoter); e != nil {
- SetGLS(e.TakeSnapShot(GetGLS()))
+ SetGLS(e.TakeSnapShot())
}
}
diff --git a/plugins/core/tracer.go b/plugins/core/tracer.go
index 184f5c8..3090502 100644
--- a/plugins/core/tracer.go
+++ b/plugins/core/tracer.go
@@ -49,11 +49,12 @@
// for plugin tools
tools *TracerTools
// for all metrics
- meterMap *sync.Map
- meterCollectListeners []func()
- ignoreSuffix []string
- traceIgnorePath []string
- mu sync.Mutex
+ meterMap *sync.Map
+ meterCollectListeners []func()
+ meterCollectListenersLock sync.RWMutex
+ ignoreSuffix []string
+ traceIgnorePath []string
+ mu sync.Mutex
}
func (t *Tracer) Init(entity *reporter.Entity, rep reporter.Reporter, samp Sampler, logger operator.LogOperator,
diff --git a/plugins/core/tracing.go b/plugins/core/tracing.go
index 789682c..65f8561 100644
--- a/plugins/core/tracing.go
+++ b/plugins/core/tracing.go
@@ -172,6 +172,8 @@
ctx = NewTracingContext()
SetGLS(ctx)
}
+ ctx.activeSpanLock.Lock()
+ defer ctx.activeSpanLock.Unlock()
ctx.activeSpan = snap.activeSpan
ctx.Runtime = snap.runtime
}
diff --git a/tools/go-agent/instrument/plugins/enhance_method.go b/tools/go-agent/instrument/plugins/enhance_method.go
index b6d918c..fe44848 100644
--- a/tools/go-agent/instrument/plugins/enhance_method.go
+++ b/tools/go-agent/instrument/plugins/enhance_method.go
@@ -153,8 +153,6 @@
result := make([]dst.Decl, 0)
result = append(result, tools.GoStringToDecls(fmt.Sprintf(`var %s = &%s{}`, m.InterceptorVarName, m.InterceptorGeneratedName))...)
- result = append(result, tools.GoStringToDecls(fmt.Sprintf(`var %s_interTimeCost int64`, m.FuncID))...)
- result = append(result, tools.GoStringToDecls(fmt.Sprintf(`var %s_beforeInterStart int64`, m.FuncID))...)
preFunc := &dst.FuncDecl{
Name: &dst.Ident{Name: m.AdapterPreFuncName},
Type: &dst.FuncType{
diff --git a/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl b/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl
index fefe85e..26238e8 100644
--- a/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl
+++ b/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl
@@ -4,8 +4,8 @@
// log error
log.Errorf("execute interceptor after invoke error, instrument name: %s, interceptor name: %s, function ID: %s, error: %v, stack: %s",
"{{.InstrumentName}}", "{{.InterceptorDefineName}}", "{{.FuncID}}", r, tracing.DebugStack())
- {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart
- operator.DurationOfInterceptor({{.FuncID}}_interTimeCost)
+ invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart
+ operator.DurationOfInterceptor(invocation.interTimeCost)
}
}()
@@ -28,5 +28,5 @@
}
{{- end }}
}
-{{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart
-operator.DurationOfInterceptor({{.FuncID}}_interTimeCost)
\ No newline at end of file
+invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart
+operator.DurationOfInterceptor(invocation.interTimeCost)
\ No newline at end of file
diff --git a/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl b/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl
index 6398dcc..a2c17dd 100644
--- a/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl
+++ b/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl
@@ -1,15 +1,15 @@
-{{.FuncID}}_interTimeCost = int64(0)
-{{.FuncID}}_beforeInterStart = operator.NanoTime()
+invocation = &operator.realInvocation{}
+invocation.interTimeCost = int64(0)
+invocation.beforeInterStart = operator.NanoTime()
defer func() {
if err := recover(); err != nil {
operator.ErrorOfPlugin("{{.InstrumentName}}")
// log error
log.Errorf("execute interceptor before invoke error, instrument name: %s, interceptor name: %s, function ID: %s, error: %v, stack: %s",
"{{.InstrumentName}}", "{{.InterceptorDefineName}}", "{{.FuncID}}", err, tracing.DebugStack())
- {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart
+ invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart
}
}()
-invocation = &operator.realInvocation{}
{{if .Recvs -}}
invocation.callerInstance = *recv_0 // for caller if exist
{{- end}}
@@ -37,7 +37,7 @@
// using go2sky log error
log.Warnf("execute interceptor before invoke error, instrument name: %s, interceptor name: %s, function ID: %s, error: %v",
"{{.InstrumentName}}", "{{.InterceptorDefineName}}", "{{.FuncID}}", err)
- {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart
+ invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart
return {{ range $index, $value := .Results -}}
def_res_{{$index}},
@@ -49,13 +49,13 @@
def_res_{{$index}} = (invocation.returnValues[{{$index}}]).({{$value.PackagedTypeName}})
}
{{- end}}
- {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart
+ invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart
return {{ range $index, $value := .Results -}}
def_res_{{$index}},
{{- end}}invocation, true
}
-{{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart
+invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart
return {{ range $index, $value := .Results -}}
def_res_{{$index}},
diff --git a/tools/go-agent/instrument/runtime/instrument.go b/tools/go-agent/instrument/runtime/instrument.go
index 6f98d2a..878c849 100644
--- a/tools/go-agent/instrument/runtime/instrument.go
+++ b/tools/go-agent/instrument/runtime/instrument.go
@@ -253,15 +253,15 @@
}
type ContextSnapshoter interface {
- TakeSnapShot(val interface{}) interface{}
+ TakeSnapShot() interface{}
}
func goroutineChange(tls interface{}) interface{} {
if tls == nil {
return nil
}
- if taker, ok := tls.(ContextSnapshoter); ok {
- return taker.TakeSnapShot(tls)
+ if taker, ok := tls.(ContextSnapshoter); ok && taker != nil {
+ return taker.TakeSnapShot()
}
return tls
}