| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package finders |
| |
| import ( |
| "context" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "github.com/hashicorp/go-multierror" |
| |
| v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/process/v3" |
| |
| "github.com/apache/skywalking-rover/pkg/core" |
| "github.com/apache/skywalking-rover/pkg/module" |
| "github.com/apache/skywalking-rover/pkg/process/api" |
| "github.com/apache/skywalking-rover/pkg/process/finders/base" |
| "github.com/apache/skywalking-rover/pkg/tools" |
| ) |
| |
| type ProcessStorage struct { |
| processes map[api.ProcessDetectType][]*ProcessContext |
| mutex sync.Mutex |
| |
| // process listeners |
| listeners []api.ProcessListener |
| eventQueue chan *processEvent |
| initListenQueue chan api.ProcessListener |
| listenerRecheckInterval time.Duration |
| |
| // working with backend |
| reportInterval time.Duration |
| propertiesReportFactor int |
| roverID string |
| processClient v3.EBPFProcessServiceClient |
| finders map[api.ProcessDetectType]base.ProcessFinder |
| reportedCount int64 |
| |
| // report context |
| ctx context.Context |
| cancel context.CancelFunc |
| } |
| |
| func NewProcessStorage(ctx context.Context, moduleManager *module.Manager, reportInterval time.Duration, |
| propertiesReportFactor int, finderList []base.ProcessFinder, listenerRecheckInterval time.Duration) (*ProcessStorage, error) { |
| data := make(map[api.ProcessDetectType][]*ProcessContext) |
| // working with core module |
| coreOperator := moduleManager.FindModule(core.ModuleName).(core.Operator) |
| roverID := coreOperator.InstanceID() |
| backendConn := coreOperator.BackendOperator().GetConnection() |
| processClient := v3.NewEBPFProcessServiceClient(backendConn) |
| ctx, cancel := context.WithCancel(ctx) |
| fs := make(map[api.ProcessDetectType]base.ProcessFinder) |
| for _, f := range finderList { |
| fs[f.DetectType()] = f |
| } |
| return &ProcessStorage{ |
| processes: data, |
| reportInterval: reportInterval, |
| propertiesReportFactor: propertiesReportFactor, |
| eventQueue: make(chan *processEvent, 100), |
| initListenQueue: make(chan api.ProcessListener, 100), |
| listenerRecheckInterval: listenerRecheckInterval, |
| reportedCount: 0, |
| roverID: roverID, |
| processClient: processClient, |
| finders: fs, |
| ctx: ctx, |
| cancel: cancel, |
| }, nil |
| } |
| |
| func (s *ProcessStorage) StartReport() { |
| // for report all processes |
| go func() { |
| timeTicker := time.NewTicker(s.reportInterval) |
| for { |
| select { |
| case <-timeTicker.C: |
| if err := s.reportAllProcesses(); err != nil { |
| log.Errorf("report all processes error: %v", err) |
| } |
| case <-s.ctx.Done(): |
| timeTicker.Stop() |
| return |
| } |
| } |
| }() |
| |
| // for start listener |
| go func() { |
| timeTicker := time.NewTicker(s.listenerRecheckInterval) |
| for { |
| select { |
| case <-timeTicker.C: |
| s.notifyToRecheckAllProcesses(s.listeners) |
| case e := <-s.eventQueue: |
| s.consumeProcessEvent(s.listeners, e) |
| case l := <-s.initListenQueue: |
| s.notifyToRecheckAllProcesses([]api.ProcessListener{l}) |
| case <-s.ctx.Done(): |
| timeTicker.Stop() |
| return |
| } |
| } |
| }() |
| } |
| |
| func (s *ProcessStorage) StopReport() error { |
| s.cancel() |
| return nil |
| } |
| |
| func (s *ProcessStorage) reportAllProcesses() error { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| if len(s.processes) == 0 { |
| return nil |
| } |
| |
| // build process list(wait report or keep alive) |
| waitReportProcesses := make([]*ProcessContext, 0) |
| keepAliveProcesses := make([]*ProcessContext, 0) |
| for _, finderProcesses := range s.processes { |
| for _, p := range finderProcesses { |
| switch p.syncStatus { |
| case NotReport: |
| waitReportProcesses = append(waitReportProcesses, p) |
| case ReportSuccess: |
| keepAliveProcesses = append(keepAliveProcesses, p) |
| } |
| } |
| } |
| |
| // if rover should report the properties, then need to force remove all keep alive processes to report |
| shouldReportProperties := atomic.AddInt64(&s.reportedCount, 1)%int64(s.propertiesReportFactor) == 0 |
| if shouldReportProperties { |
| log.Infof("detection has reached the properties report factor, forced to report all processes properties") |
| waitReportProcesses = append(waitReportProcesses, keepAliveProcesses...) |
| keepAliveProcesses = make([]*ProcessContext, 0) |
| } |
| var result error |
| if err := s.processesReport(waitReportProcesses); err != nil { |
| result = multierror.Append(result, err) |
| } |
| if err := s.processesKeepAlive(keepAliveProcesses); err != nil { |
| result = multierror.Append(result, err) |
| } |
| |
| return result |
| } |
| |
| func (s *ProcessStorage) processesKeepAlive(waitKeepAliveProcess []*ProcessContext) error { |
| if len(waitKeepAliveProcess) == 0 { |
| return nil |
| } |
| |
| processIDList := make([]*v3.EBPFProcessPingPkg, 0) |
| for _, ps := range waitKeepAliveProcess { |
| if ps.id == "" { |
| log.Warnf("the process id is not found before keep alive, need to report, pid: %d, process name: %s", |
| ps.Pid(), ps.Entity().ProcessName) |
| ps.syncStatus = NotReport |
| continue |
| } |
| processIDList = append(processIDList, &v3.EBPFProcessPingPkg{ |
| EntityMetadata: &v3.EBPFProcessEntityMetadata{ |
| Layer: ps.Entity().Layer, |
| ServiceName: ps.Entity().ServiceName, |
| InstanceName: ps.Entity().InstanceName, |
| ProcessName: ps.Entity().ProcessName, |
| Labels: ps.Entity().Labels, |
| }, |
| Properties: s.finders[ps.detectType].BuildNecessaryProperties(ps.detectProcess), |
| }) |
| } |
| |
| _, err := s.processClient.KeepAlive(s.ctx, &v3.EBPFProcessPingPkgList{ |
| EbpfAgentID: s.roverID, |
| Processes: processIDList, |
| }) |
| return err |
| } |
| |
| func (s *ProcessStorage) processesReport(waitReportProcesses []*ProcessContext) error { |
| if len(waitReportProcesses) == 0 { |
| return nil |
| } |
| |
| properties := make([]*v3.EBPFProcessProperties, 0) |
| buildContext := &base.BuildEBPFProcessContext{} |
| buildContext.HostIP = tools.DefaultHostIPAddress() |
| for _, ps := range waitReportProcesses { |
| properties = append(properties, s.finders[ps.DetectType()].BuildEBPFProcess(buildContext, ps.detectProcess)) |
| } |
| processes, err := s.processClient.ReportProcesses(s.ctx, &v3.EBPFProcessReportList{Processes: properties, EbpfAgentID: s.roverID}) |
| if err != nil { |
| return err |
| } |
| |
| eventBuilder := s.newProcessEventBuilder(ProcessOperateAdd) |
| for _, waitProcess := range waitReportProcesses { |
| found := false |
| for _, reportedProcess := range processes.GetProcesses() { |
| id := s.finders[waitProcess.DetectType()].ParseProcessID(waitProcess.detectProcess, reportedProcess) |
| if id == "" { |
| continue |
| } |
| |
| s.updateProcessToUploadSuccess(waitProcess, id) |
| found = true |
| eventBuilder.AddProcess(waitProcess.Pid(), waitProcess) |
| break |
| } |
| |
| if !found { |
| s.updateProcessToUploadIgnored(waitProcess) |
| } |
| } |
| |
| eventBuilder.Send() |
| return nil |
| } |
| |
| func (s *ProcessStorage) AddNewProcessInFinder(finder api.ProcessDetectType, processes []api.DetectedProcess) { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| |
| addProcessBuilder := s.newProcessEventBuilder(ProcessOperateAdd) |
| for _, newProcess := range processes { |
| if newProcess == nil { |
| continue |
| } |
| founded := false |
| for _, existingProcess := range s.processes[finder] { |
| if existingProcess.Pid() == newProcess.Pid() && existingProcess.Entity().SameWith(newProcess.Entity()) { |
| founded = true |
| break |
| } |
| } |
| |
| // if not found in existing processes, need to add this process |
| if !founded { |
| processContext := s.constructNewProcessContext(finder, newProcess) |
| addProcessBuilder.AddProcess(newProcess.Pid(), processContext) |
| s.processes[finder] = append(s.processes[finder], processContext) |
| log.Infof("detected new process by add process: pid: %d, entity: %s", newProcess.Pid(), newProcess.Entity()) |
| } |
| } |
| addProcessBuilder.Send() |
| } |
| |
| func (s *ProcessStorage) SyncAllProcessInFinder(finder api.ProcessDetectType, processes []api.DetectedProcess) { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| |
| newProcesses := make([]*ProcessContext, 0) |
| |
| existingProcesses := s.processes[finder] |
| existingProcessHasFounded := make(map[*ProcessContext]bool) |
| for _, p := range existingProcesses { |
| existingProcessHasFounded[p] = false |
| } |
| |
| addProcessBuilder := s.newProcessEventBuilder(ProcessOperateAdd) |
| for _, syncProcess := range processes { |
| if syncProcess == nil { |
| continue |
| } |
| founded := false |
| for _, existingProcess := range existingProcesses { |
| if syncProcess.Pid() == existingProcess.Pid() && syncProcess.Entity().SameWith(existingProcess.Entity()) { |
| newProcesses = append(newProcesses, existingProcess) |
| existingProcessHasFounded[existingProcess] = true |
| founded = true |
| break |
| } |
| } |
| |
| // if not found in existing processes, need to add this process |
| if !founded { |
| processContext := s.constructNewProcessContext(finder, syncProcess) |
| newProcesses = append(newProcesses, processContext) |
| addProcessBuilder.AddProcess(syncProcess.Pid(), newProcesses[len(newProcesses)-1]) |
| log.Infof("detected new process by sync all: pid: %d, entity: %s", syncProcess.Pid(), syncProcess.Entity()) |
| } |
| } |
| addProcessBuilder.Send() |
| |
| // log the dead processes |
| eventBuilder := s.newProcessEventBuilder(ProcessOperateDelete) |
| for p, found := range existingProcessHasFounded { |
| if found { |
| continue |
| } |
| log.Infof("the process has been recognized as dead, so deleted. pid: %d, entity: %s, id: %s", p.Pid(), p.Entity(), p.id) |
| eventBuilder.AddProcess(p.Pid(), p) |
| } |
| |
| s.processes[finder] = newProcesses |
| eventBuilder.Send() |
| } |
| |
| func (s *ProcessStorage) constructNewProcessContext(finder api.ProcessDetectType, process api.DetectedProcess) *ProcessContext { |
| exporsedPorts := make(map[int]bool) |
| for _, p := range process.ExposePorts() { |
| exporsedPorts[p] = true |
| } |
| return &ProcessContext{ |
| syncStatus: NotReport, |
| detectProcess: process, |
| detectType: finder, |
| exposedPorts: exporsedPorts, |
| } |
| } |
| |
| func (s *ProcessStorage) updateProcessToUploadSuccess(pc *ProcessContext, id string) { |
| reported := pc.id == id |
| pc.id = id |
| pc.syncStatus = ReportSuccess |
| if !reported { |
| log.Infof("uploaded process pid: %d, name: %s, id: %s", pc.detectProcess.Pid(), pc.detectProcess.Entity().ProcessName, id) |
| } |
| } |
| |
| func (s *ProcessStorage) updateProcessToUploadIgnored(pc *ProcessContext) { |
| pc.syncStatus = Ignore |
| log.Infof("could not found the process id from upstream, pid: %d, entity: %v", pc.Pid(), pc.Entity()) |
| } |
| |
| func (s *ProcessStorage) GetAllProcesses() []api.ProcessInterface { |
| result := make([]api.ProcessInterface, 0) |
| for _, processList := range s.processes { |
| for _, p := range processList { |
| result = append(result, p) |
| } |
| } |
| return result |
| } |
| |
| func (s *ProcessStorage) FindAllRegisteredProcesses() []api.ProcessInterface { |
| result := make([]api.ProcessInterface, 0) |
| for _, processList := range s.processes { |
| for _, p := range processList { |
| if p.syncStatus == ReportSuccess { |
| result = append(result, p) |
| } |
| } |
| } |
| return result |
| } |
| |
| func (s *ProcessStorage) FindProcessByID(processID string) api.ProcessInterface { |
| for _, finderProcesses := range s.processes { |
| for _, p := range finderProcesses { |
| if p.id == processID { |
| return p |
| } |
| } |
| } |
| return nil |
| } |
| |
| func (s *ProcessStorage) FindProcessByPID(pid int32) []api.ProcessInterface { |
| result := make([]api.ProcessInterface, 0) |
| for _, finderProcesses := range s.processes { |
| for _, p := range finderProcesses { |
| if p.Pid() == pid { |
| result = append(result, p) |
| } |
| } |
| } |
| return result |
| } |
| |
| func (s *ProcessStorage) AddListener(listener api.ProcessListener) { |
| s.listeners = append(s.listeners, listener) |
| s.initListenQueue <- listener |
| } |
| |
| func (s *ProcessStorage) DeleteListener(listener api.ProcessListener) { |
| result := make([]api.ProcessListener, 0) |
| for _, l := range s.listeners { |
| if l != listener { |
| result = append(result, l) |
| } |
| } |
| s.listeners = result |
| } |
| |
| type ProcessOperate int |
| |
| const ( |
| ProcessOperateAdd = 1 |
| ProcessOperateDelete = 2 |
| ) |
| |
| type processEventBuilder struct { |
| processes map[int32][]api.ProcessInterface |
| operate ProcessOperate |
| storage *ProcessStorage |
| } |
| |
| func (s *ProcessStorage) newProcessEventBuilder(operate ProcessOperate) *processEventBuilder { |
| return &processEventBuilder{ |
| processes: make(map[int32][]api.ProcessInterface), |
| operate: operate, |
| storage: s, |
| } |
| } |
| |
| func (p *processEventBuilder) AddProcess(pid int32, pi api.ProcessInterface) { |
| ps := p.processes[pid] |
| ps = append(ps, pi) |
| p.processes[pid] = ps |
| } |
| |
| func (p *processEventBuilder) Send() { |
| for pid, processes := range p.processes { |
| p.storage.eventQueue <- &processEvent{ |
| pid: pid, |
| processes: processes, |
| operate: p.operate, |
| } |
| } |
| } |
| |
| type processEvent struct { |
| pid int32 |
| processes []api.ProcessInterface |
| operate ProcessOperate |
| } |
| |
| func (s *ProcessStorage) consumeProcessEvent(listeners []api.ProcessListener, e *processEvent) { |
| for _, l := range listeners { |
| if e.operate == ProcessOperateAdd { |
| l.AddNewProcess(e.pid, e.processes) |
| } else { |
| l.RemoveProcess(e.pid, e.processes) |
| } |
| } |
| } |
| |
| func (s *ProcessStorage) notifyToRecheckAllProcesses(listeners []api.ProcessListener) { |
| if len(listeners) == 0 { |
| return |
| } |
| // build all processes |
| events := s.newProcessEventBuilder(ProcessOperateAdd) |
| for _, pcs := range s.processes { |
| for _, pc := range pcs { |
| events.AddProcess(pc.Pid(), pc) |
| } |
| } |
| for _, l := range listeners { |
| l.RecheckAllProcesses(events.processes) |
| } |
| } |