| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package golang |
| |
| import ( |
| "context" |
| "sync" |
| "time" |
| |
| "go.uber.org/atomic" |
| |
| "contrib.go.opencensus.io/exporter/ocagent" |
| "github.com/apache/rocketmq-clients/golang/v5/pkg/utils" |
| v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2" |
| "go.opencensus.io/stats" |
| "go.opencensus.io/stats/view" |
| "go.opencensus.io/tag" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/credentials" |
| ) |
| |
| type InvocationStatus string |
| |
| const ( |
| InvocationStatus_SUCCESS InvocationStatus = "success" |
| InvocationStatus_FAILURE InvocationStatus = "failure" |
| ) |
| |
| var ( |
| topicTag, _ = tag.NewKey("topic") |
| clientIdTag, _ = tag.NewKey("client_id") |
| invocationStatusTag, _ = tag.NewKey("invocation_status") |
| |
| MLatencyMs = stats.Int64("publish_latency", "Publish latency in milliseconds", "ms") |
| |
| PublishLatencyView = view.View{ |
| Name: "rocketmq_send_cost_time", |
| Description: "Publish latency", |
| Measure: MLatencyMs, |
| Aggregation: view.Distribution(1, 5, 10, 20, 50, 200, 500), |
| TagKeys: []tag.Key{topicTag, clientIdTag, invocationStatusTag}, |
| } |
| ) |
| |
| func init() { |
| if err := view.Register(&PublishLatencyView); err != nil { |
| sugarBaseLogger.Fatalf("failed to register views: %v", err) |
| } |
| view.SetReportingPeriod(time.Minute) |
| } |
| |
| type defaultClientMeter struct { |
| enabled atomic.Bool |
| endpoints *v2.Endpoints |
| ocaExporter view.Exporter |
| mutex sync.Mutex |
| } |
| |
| func (dcm *defaultClientMeter) shutdown() { |
| if !dcm.enabled.Load() { |
| return |
| } |
| dcm.mutex.Lock() |
| defer dcm.mutex.Unlock() |
| view.UnregisterExporter(dcm.ocaExporter) |
| if dcm.ocaExporter != nil { |
| exporter, ok := dcm.ocaExporter.(*ocagent.Exporter) |
| if ok { |
| err := exporter.Stop() |
| if err != nil { |
| sugarBaseLogger.Errorf("ocExporter stop failed, err=%w", err) |
| } |
| } |
| } |
| } |
| |
| func (dcm *defaultClientMeter) start() { |
| if !dcm.enabled.Load() { |
| return |
| } |
| view.RegisterExporter(dcm.ocaExporter) |
| } |
| |
| var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter { |
| return &defaultClientMeter{ |
| enabled: *atomic.NewBool(on), |
| endpoints: endpoints, |
| ocaExporter: exporter, |
| } |
| } |
| |
| type MessageMeterInterceptor interface { |
| MessageInterceptor |
| } |
| |
| type defaultMessageMeterInterceptor struct { |
| clientMeterProvider ClientMeterProvider |
| } |
| |
| type ClientMeterProvider interface { |
| Reset(metric *v2.Metric) |
| isEnabled() bool |
| getClientID() string |
| } |
| |
| var _ = ClientMeterProvider(&defaultClientMeterProvider{}) |
| |
| type defaultClientMeterProvider struct { |
| client Client |
| clientMeter *defaultClientMeter |
| globalMutex sync.Mutex |
| } |
| |
| var _ = MessageMeterInterceptor(&defaultMessageMeterInterceptor{}) |
| |
| var NewDefaultMessageMeterInterceptor = func(clientMeterProvider ClientMeterProvider) *defaultMessageMeterInterceptor { |
| return &defaultMessageMeterInterceptor{ |
| clientMeterProvider: clientMeterProvider, |
| } |
| } |
| |
| func (dmmi *defaultMessageMeterInterceptor) doBefore(messageHookPoints MessageHookPoints, messageCommons []*MessageCommon) error { |
| return nil |
| } |
| |
| func (dmmi *defaultMessageMeterInterceptor) doAfterSendMessage(messageCommons []*MessageCommon, duration time.Duration, status MessageHookPointsStatus) error { |
| invocationStatus := InvocationStatus_FAILURE |
| if status == MessageHookPointsStatus_OK { |
| invocationStatus = InvocationStatus_SUCCESS |
| } |
| for _, messageCommon := range messageCommons { |
| err := stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Insert(topicTag, messageCommon.topic), tag.Insert(clientIdTag, dmmi.clientMeterProvider.getClientID()), tag.Insert(invocationStatusTag, string(invocationStatus))}, MLatencyMs.M(duration.Milliseconds())) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (dmmi *defaultMessageMeterInterceptor) doAfter(messageHookPoints MessageHookPoints, messageCommons []*MessageCommon, duration time.Duration, status MessageHookPointsStatus) error { |
| if !dmmi.clientMeterProvider.isEnabled() { |
| return nil |
| } |
| switch messageHookPoints { |
| case MessageHookPoints_SEND: |
| return dmmi.doAfterSendMessage(messageCommons, duration, status) |
| default: |
| break |
| } |
| return nil |
| } |
| func (dcmp *defaultClientMeterProvider) isEnabled() bool { |
| return dcmp.clientMeter.enabled.Load() |
| } |
| func (dcmp *defaultClientMeterProvider) getClientID() string { |
| return dcmp.client.GetClientID() |
| } |
| func (dcmp *defaultClientMeterProvider) Reset(metric *v2.Metric) { |
| dcmp.globalMutex.Lock() |
| defer dcmp.globalMutex.Unlock() |
| endpoints := metric.GetEndpoints() |
| if dcmp.clientMeter.enabled.Load() && metric.GetOn() && utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) { |
| sugarBaseLogger.Infof("metric settings is satisfied by the current message meter, clientId=%s", dcmp.client.GetClientID()) |
| return |
| } |
| |
| if !metric.GetOn() { |
| dcmp.clientMeter.shutdown() |
| sugarBaseLogger.Infof("metric is off, clientId=%s", dcmp.client.GetClientID()) |
| dcmp.clientMeter = NewDefaultClientMeter(nil, false, nil, dcmp.client.GetClientID()) |
| return |
| } |
| agentAddr := utils.ParseAddress(utils.SelectAnAddress(endpoints)) |
| exporter, err := ocagent.NewExporter( |
| ocagent.WithInsecure(), |
| ocagent.WithTLSCredentials(credentials.NewTLS(defaultConnOptions.TLS)), |
| ocagent.WithAddress(agentAddr), |
| ocagent.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(dcmp.invokeWithSign())), |
| ) |
| if err != nil { |
| sugarBaseLogger.Errorf("exception raised when resetting message meter, clientId=%s", dcmp.client.GetClientID()) |
| return |
| } |
| // Reset message meter. |
| dcmp.clientMeter.shutdown() |
| dcmp.clientMeter = NewDefaultClientMeter(exporter, true, endpoints, dcmp.client.GetClientID()) |
| dcmp.clientMeter.start() |
| sugarBaseLogger.Infof("metrics is on, endpoints=%v, clientId=%s", endpoints, dcmp.client.GetClientID()) |
| } |
| |
| var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider { |
| cmp := &defaultClientMeterProvider{ |
| client: client, |
| clientMeter: NewDefaultClientMeter(nil, false, nil, "nil"), |
| } |
| client.registerMessageInterceptor(NewDefaultMessageMeterInterceptor(cmp)) |
| return cmp |
| } |
| |
| var _ = ClientMeterProvider(&defaultClientMeterProvider{}) |
| |
| func (dcmp *defaultClientMeterProvider) invokeWithSign() grpc.UnaryClientInterceptor { |
| return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| newCtx := dcmp.client.Sign(ctx) |
| return invoker(newCtx, method, req, reply, cc, opts...) |
| } |
| } |