| // Copyright 2018, OpenCensus Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package ocagent |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "sync" |
| "time" |
| |
| "google.golang.org/api/support/bundler" |
| "google.golang.org/grpc" |
| |
| "go.opencensus.io/resource" |
| "go.opencensus.io/stats/view" |
| "go.opencensus.io/trace" |
| |
| commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" |
| agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" |
| agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" |
| metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" |
| resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" |
| tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" |
| ) |
| |
| var startupMu sync.Mutex |
| var startTime time.Time |
| |
| func init() { |
| startupMu.Lock() |
| startTime = time.Now() |
| startupMu.Unlock() |
| } |
| |
| var _ trace.Exporter = (*Exporter)(nil) |
| var _ view.Exporter = (*Exporter)(nil) |
| |
| type Exporter struct { |
| connectionState int32 |
| |
| // mu protects the non-atomic and non-channel variables |
| mu sync.RWMutex |
| // senderMu protects the concurrent unsafe traceExporter client |
| senderMu sync.RWMutex |
| started bool |
| stopped bool |
| agentAddress string |
| serviceName string |
| canDialInsecure bool |
| traceExporter agenttracepb.TraceService_ExportClient |
| metricsExporter agentmetricspb.MetricsService_ExportClient |
| nodeInfo *commonpb.Node |
| grpcClientConn *grpc.ClientConn |
| reconnectionPeriod time.Duration |
| resource *resourcepb.Resource |
| compressor string |
| |
| startOnce sync.Once |
| stopCh chan bool |
| disconnectedCh chan bool |
| |
| backgroundConnectionDoneCh chan bool |
| |
| traceBundler *bundler.Bundler |
| |
| // viewDataBundler is the bundler to enable conversion |
| // from OpenCensus-Go view.Data to metricspb.Metric. |
| // Please do not confuse it with metricsBundler! |
| viewDataBundler *bundler.Bundler |
| } |
| |
| func NewExporter(opts ...ExporterOption) (*Exporter, error) { |
| exp, err := NewUnstartedExporter(opts...) |
| if err != nil { |
| return nil, err |
| } |
| if err := exp.Start(); err != nil { |
| return nil, err |
| } |
| return exp, nil |
| } |
| |
| const spanDataBufferSize = 300 |
| |
| func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) { |
| e := new(Exporter) |
| for _, opt := range opts { |
| opt.withExporter(e) |
| } |
| traceBundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) { |
| e.uploadTraces(bundle.([]*trace.SpanData)) |
| }) |
| traceBundler.DelayThreshold = 2 * time.Second |
| traceBundler.BundleCountThreshold = spanDataBufferSize |
| e.traceBundler = traceBundler |
| |
| viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) { |
| e.uploadViewData(bundle.([]*view.Data)) |
| }) |
| viewDataBundler.DelayThreshold = 2 * time.Second |
| viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable. |
| e.viewDataBundler = viewDataBundler |
| e.nodeInfo = NodeWithStartTime(e.serviceName) |
| e.resource = resourceProtoFromEnv() |
| |
| return e, nil |
| } |
| |
| const ( |
| maxInitialConfigRetries = 10 |
| maxInitialTracesRetries = 10 |
| ) |
| |
| var ( |
| errAlreadyStarted = errors.New("already started") |
| errNotStarted = errors.New("not started") |
| errStopped = errors.New("stopped") |
| errNoConnection = errors.New("no active connection") |
| ) |
| |
| // Start dials to the agent, establishing a connection to it. It also |
| // initiates the Config and Trace services by sending over the initial |
| // messages that consist of the node identifier. Start invokes a background |
| // connector that will reattempt connections to the agent periodically |
| // if the connection dies. |
| func (ae *Exporter) Start() error { |
| var err = errAlreadyStarted |
| ae.startOnce.Do(func() { |
| ae.mu.Lock() |
| defer ae.mu.Unlock() |
| |
| ae.started = true |
| ae.disconnectedCh = make(chan bool, 1) |
| ae.stopCh = make(chan bool) |
| ae.backgroundConnectionDoneCh = make(chan bool) |
| |
| ae.setStateDisconnected() |
| go ae.indefiniteBackgroundConnection() |
| |
| err = nil |
| }) |
| |
| return err |
| } |
| |
| func (ae *Exporter) prepareAgentAddress() string { |
| if ae.agentAddress != "" { |
| return ae.agentAddress |
| } |
| return fmt.Sprintf("%s:%d", DefaultAgentHost, DefaultAgentPort) |
| } |
| |
| func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error { |
| ae.mu.RLock() |
| started := ae.started |
| nodeInfo := ae.nodeInfo |
| ae.mu.RUnlock() |
| |
| if !started { |
| return errNotStarted |
| } |
| |
| ae.mu.Lock() |
| // If the previous clientConn was non-nil, close it |
| if ae.grpcClientConn != nil { |
| _ = ae.grpcClientConn.Close() |
| } |
| ae.grpcClientConn = cc |
| ae.mu.Unlock() |
| |
| if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil { |
| return err |
| } |
| |
| return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo) |
| } |
| |
| func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error { |
| // Initiate the trace service by sending over node identifier info. |
| traceSvcClient := agenttracepb.NewTraceServiceClient(cc) |
| traceExporter, err := traceSvcClient.Export(context.Background()) |
| if err != nil { |
| return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err) |
| } |
| |
| firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{ |
| Node: node, |
| Resource: ae.resource, |
| } |
| if err := traceExporter.Send(firstTraceMessage); err != nil { |
| return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err) |
| } |
| |
| ae.mu.Lock() |
| ae.traceExporter = traceExporter |
| ae.mu.Unlock() |
| |
| // Initiate the config service by sending over node identifier info. |
| configStream, err := traceSvcClient.Config(context.Background()) |
| if err != nil { |
| return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err) |
| } |
| firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node} |
| if err := configStream.Send(firstCfgMessage); err != nil { |
| return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err) |
| } |
| |
| // In the background, handle trace configurations that are beamed down |
| // by the agent, but also reply to it with the applied configuration. |
| go ae.handleConfigStreaming(configStream) |
| |
| return nil |
| } |
| |
| func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error { |
| metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc) |
| metricsExporter, err := metricsSvcClient.Export(context.Background()) |
| if err != nil { |
| return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err) |
| } |
| // Initiate the metrics service by sending over the first message just containing the Node and Resource. |
| firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{ |
| Node: node, |
| Resource: ae.resource, |
| } |
| if err := metricsExporter.Send(firstMetricsMessage); err != nil { |
| return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err) |
| } |
| |
| ae.mu.Lock() |
| ae.metricsExporter = metricsExporter |
| ae.mu.Unlock() |
| |
| // With that we are good to go and can start sending metrics |
| return nil |
| } |
| |
| func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) { |
| addr := ae.prepareAgentAddress() |
| var dialOpts []grpc.DialOption |
| if ae.canDialInsecure { |
| dialOpts = append(dialOpts, grpc.WithInsecure()) |
| } |
| if ae.compressor != "" { |
| dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(ae.compressor))) |
| } |
| return grpc.Dial(addr, dialOpts...) |
| } |
| |
| func (ae *Exporter) handleConfigStreaming(configStream agenttracepb.TraceService_ConfigClient) error { |
| // Note: We haven't yet implemented configuration sending so we |
| // should NOT be changing connection states within this function for now. |
| for { |
| recv, err := configStream.Recv() |
| if err != nil { |
| // TODO: Check if this is a transient error or exponential backoff-able. |
| return err |
| } |
| cfg := recv.Config |
| if cfg == nil { |
| continue |
| } |
| |
| // Otherwise now apply the trace configuration sent down from the agent |
| if psamp := cfg.GetProbabilitySampler(); psamp != nil { |
| trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(psamp.SamplingProbability)}) |
| } else if csamp := cfg.GetConstantSampler(); csamp != nil { |
| alwaysSample := csamp.Decision == true |
| if alwaysSample { |
| trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) |
| } else { |
| trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()}) |
| } |
| } else { // TODO: Add the rate limiting sampler here |
| } |
| |
| // Then finally send back to upstream the newly applied configuration |
| err = configStream.Send(&agenttracepb.CurrentLibraryConfig{Config: &tracepb.TraceConfig{Sampler: cfg.Sampler}}) |
| if err != nil { |
| return err |
| } |
| } |
| } |
| |
| // Stop shuts down all the connections and resources |
| // related to the exporter. |
| func (ae *Exporter) Stop() error { |
| ae.mu.RLock() |
| cc := ae.grpcClientConn |
| started := ae.started |
| stopped := ae.stopped |
| ae.mu.RUnlock() |
| |
| if !started { |
| return errNotStarted |
| } |
| if stopped { |
| // TODO: tell the user that we've already stopped, so perhaps a sentinel error? |
| return nil |
| } |
| |
| ae.Flush() |
| |
| // Now close the underlying gRPC connection. |
| var err error |
| if cc != nil { |
| err = cc.Close() |
| } |
| |
| // At this point we can change the state variables: started and stopped |
| ae.mu.Lock() |
| ae.started = false |
| ae.stopped = true |
| ae.mu.Unlock() |
| close(ae.stopCh) |
| |
| // Ensure that the backgroundConnector returns |
| <-ae.backgroundConnectionDoneCh |
| |
| return err |
| } |
| |
| func (ae *Exporter) ExportSpan(sd *trace.SpanData) { |
| if sd == nil { |
| return |
| } |
| _ = ae.traceBundler.Add(sd, 1) |
| } |
| |
| func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error { |
| if batch == nil || len(batch.Spans) == 0 { |
| return nil |
| } |
| |
| select { |
| case <-ae.stopCh: |
| return errStopped |
| |
| default: |
| if !ae.connected() { |
| return errNoConnection |
| } |
| |
| ae.senderMu.Lock() |
| err := ae.traceExporter.Send(batch) |
| ae.senderMu.Unlock() |
| if err != nil { |
| ae.setStateDisconnected() |
| return err |
| } |
| return nil |
| } |
| } |
| |
| func (ae *Exporter) ExportView(vd *view.Data) { |
| if vd == nil { |
| return |
| } |
| _ = ae.viewDataBundler.Add(vd, 1) |
| } |
| |
| func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span { |
| if len(sdl) == 0 { |
| return nil |
| } |
| protoSpans := make([]*tracepb.Span, 0, len(sdl)) |
| for _, sd := range sdl { |
| if sd != nil { |
| protoSpans = append(protoSpans, ocSpanToProtoSpan(sd)) |
| } |
| } |
| return protoSpans |
| } |
| |
| func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) { |
| select { |
| case <-ae.stopCh: |
| return |
| |
| default: |
| if !ae.connected() { |
| return |
| } |
| |
| protoSpans := ocSpanDataToPbSpans(sdl) |
| if len(protoSpans) == 0 { |
| return |
| } |
| ae.senderMu.Lock() |
| err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{ |
| Spans: protoSpans, |
| }) |
| ae.senderMu.Unlock() |
| if err != nil { |
| ae.setStateDisconnected() |
| } |
| } |
| } |
| |
| func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric { |
| if len(vdl) == 0 { |
| return nil |
| } |
| metrics := make([]*metricspb.Metric, 0, len(vdl)) |
| for _, vd := range vdl { |
| if vd != nil { |
| vmetric, err := viewDataToMetric(vd) |
| // TODO: (@odeke-em) somehow report this error, if it is non-nil. |
| if err == nil && vmetric != nil { |
| metrics = append(metrics, vmetric) |
| } |
| } |
| } |
| return metrics |
| } |
| |
| func (ae *Exporter) uploadViewData(vdl []*view.Data) { |
| select { |
| case <-ae.stopCh: |
| return |
| |
| default: |
| if !ae.connected() { |
| return |
| } |
| |
| protoMetrics := ocViewDataToPbMetrics(vdl) |
| if len(protoMetrics) == 0 { |
| return |
| } |
| err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{ |
| Metrics: protoMetrics, |
| // TODO:(@odeke-em) |
| // a) Figure out how to derive a Node from the environment |
| // b) Figure out how to derive a Resource from the environment |
| // or better letting users of the exporter configure it. |
| }) |
| if err != nil { |
| ae.setStateDisconnected() |
| } |
| } |
| } |
| |
| func (ae *Exporter) Flush() { |
| ae.traceBundler.Flush() |
| ae.viewDataBundler.Flush() |
| } |
| |
| func resourceProtoFromEnv() *resourcepb.Resource { |
| rs, _ := resource.FromEnv(context.Background()) |
| if rs == nil { |
| return nil |
| } |
| |
| rprs := &resourcepb.Resource{ |
| Type: rs.Type, |
| } |
| if rs.Labels != nil { |
| rprs.Labels = make(map[string]string) |
| for k, v := range rs.Labels { |
| rprs.Labels[k] = v |
| } |
| } |
| return rprs |
| } |