blob: 45dd12fd64ce192276858fedffb05342d5ca795f [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package continuous
import (
profilingv3 ""
meterv3 ""
var checkerRegistration = make([]base.Checker, 0)
func init() {
checkerRegistration = append(checkerRegistration,
// system
// process
// network
type Checkers struct {
meterPrefix string
fetchDuration time.Duration
checkDuration time.Duration
processOperator process.Operator
triggers *Triggers
policiesCache map[string]*base.ServicePolicy
meterClient meterv3.MeterReportServiceClient
continuousClient profilingv3.ContinuousProfilingServiceClient
ctx context.Context
func NewCheckers(ctx context.Context, moduleMgr *module.Manager, conf *base.ContinuousConfig, triggers *Triggers) (*Checkers, error) {
connection := moduleMgr.FindModule(core.ModuleName).(core.Operator).BackendOperator().GetConnection()
meterClient := meterv3.NewMeterReportServiceClient(connection)
continuousClient := profilingv3.NewContinuousProfilingServiceClient(connection)
if conf.MeterPrefix == "" {
return nil, fmt.Errorf("the continuous profiling meter prefix cannot be empty")
fetchDuration, err := time.ParseDuration(conf.FetchInterval)
if err != nil {
return nil, fmt.Errorf("fetch duration error: %v", err)
checkDuration, err := time.ParseDuration(conf.CheckInterval)
if err != nil {
return nil, fmt.Errorf("check duration error: %v", err)
for _, checker := range checkerRegistration {
if e := checker.Init(conf); e != nil {
err = multierror.Append(err, e)
if err != nil {
return nil, err
return &Checkers{
meterClient: meterClient,
continuousClient: continuousClient,
meterPrefix: conf.MeterPrefix,
fetchDuration: fetchDuration,
checkDuration: checkDuration,
processOperator: moduleMgr.FindModule(process.ModuleName).(process.Operator),
triggers: triggers,
policiesCache: make(map[string]*base.ServicePolicy),
ctx: ctx,
}, nil
func (c *Checkers) Start() {
// starting to check the threshold with interval
go func() {
fetchTicker := time.NewTicker(c.fetchDuration)
checkTicker := time.NewTicker(c.checkDuration)
for {
select {
case <-fetchTicker.C:
if err := c.fetchAllData(); err != nil {
log.Errorf("fetch all data error: %v", err)
case <-checkTicker.C:
case <-c.ctx.Done():
func (c *Checkers) Stop() error {
var err error
for _, checker := range checkerRegistration {
if e := checker.Close(); e != nil {
err = multierror.Append(err, e)
return err
func (c *Checkers) CheckProfilingPolicies() error {
// fetch and update the policies
if hasUpdate, err := c.updatePolicyCache(); err != nil {
return err
} else if !hasUpdate {
return nil
// synchronized to all checkers
policiesWithProcesses := make([]*base.SyncPolicyWithProcesses, 0)
for _, servicePolicy := range c.policiesCache {
for _, policy := range servicePolicy.Policies {
policiesWithProcesses = append(policiesWithProcesses, &base.SyncPolicyWithProcesses{
Policy: policy,
Processes: servicePolicy.Processes,
for _, checker := range checkerRegistration {
return nil
func (c *Checkers) fetchAllData() error {
var err error
for _, checker := range checkerRegistration {
if e := checker.Fetch(); e != nil {
err = multierror.Append(err, e)
return err
func (c *Checkers) checkAllThresholds() {
// check all thresholds and send metrics
metricsAppender := base.NewMetricsAppender(c.meterPrefix)
causes := c.findAllMatchCauses(metricsAppender)
if e := metricsAppender.Flush(c.ctx, c.meterClient); e != nil {
log.Warnf("flush the checker metrics failure: %v", e)
if len(causes) == 0 {
func (c *Checkers) findAllMatchCauses(appender *base.MetricsAppender) []base.ThresholdCause {
causes := make([]base.ThresholdCause, 0)
for _, checker := range checkerRegistration {
overThresholds := checker.Check(c, appender)
if len(overThresholds) == 0 {
causes = append(causes, overThresholds...)
return causes
func (c *Checkers) ShouldCheck(p api.ProcessInterface, item *base.PolicyItem) bool {
profilingType := item.Policy.TargetProfilingType
trigger := triggerRegistration[profilingType]
return trigger.ShouldTrigger(p)
func (c *Checkers) updatePolicyCache() (bool, error) {
processes := c.processOperator.FindAllRegisteredProcesses()
if len(processes) == 0 {
// if existing policies, then clean it
if (len(c.policiesCache)) > 0 {
c.policiesCache = make(map[string]*base.ServicePolicy)
return true, nil
return false, nil
serviceProcesses := make(map[string]map[string]api.ProcessInterface)
// get all existing service and policy UUID mapping
servicePolicyUUIDCache := make(map[string]string, 0)
for _, p := range processes {
serviceName := p.Entity().ServiceName
cachedPolicy := c.policiesCache[serviceName]
if cachedPolicy != nil {
servicePolicyUUIDCache[serviceName] = cachedPolicy.UUID
} else {
servicePolicyUUIDCache[serviceName] = ""
// build the service process
serviceProcessesMap := serviceProcesses[serviceName]
if serviceProcessesMap == nil {
serviceProcessesMap = make(map[string]api.ProcessInterface)
serviceProcesses[serviceName] = serviceProcessesMap
serviceProcessesMap[p.ID()] = p
policiesUpdates, err := c.queryPolicyUpdates(servicePolicyUUIDCache)
if err != nil {
return false, err
hasUpdate := false
for serviceName, policy := range policiesUpdates {
existingPolicy := c.policiesCache[serviceName]
// update cache if the service policy not exist or UUID are not same
if existingPolicy == nil || existingPolicy.UUID != policy.UUID {
existingPolicy = policy
c.policiesCache[serviceName] = policy
hasUpdate = true
// update the processes if they are not same
if !c.checkProcessesAreSame(existingPolicy.Processes, serviceProcesses[serviceName]) {
hasUpdate = true
existingPolicy.Processes = serviceProcesses[serviceName]
return hasUpdate, nil
func (c *Checkers) checkProcessesAreSame(from, target map[string]api.ProcessInterface) bool {
if len(from) != len(target) {
return false
// all process id have same pid
for processID, targetProcess := range target {
if fromProcess := from[processID]; fromProcess == nil {
return false
} else if fromProcess.Pid() != targetProcess.Pid() {
return false
return true
func (c *Checkers) queryPolicyUpdates(servicePolicies map[string]string) (map[string]*base.ServicePolicy, error) {
queries := make([]*profilingv3.ContinuousProfilingServicePolicyQuery, 0)
for k, v := range servicePolicies {
queries = append(queries, &profilingv3.ContinuousProfilingServicePolicyQuery{
ServiceName: k,
Uuid: v,
policyUpdateCommands, err := c.continuousClient.QueryPolicies(c.ctx, &profilingv3.ContinuousProfilingPolicyQuery{Policies: queries})
if err != nil {
return nil, err
// no update
if len(policyUpdateCommands.GetCommands()) == 0 {
return nil, nil
var policyJSON string
if len(policyUpdateCommands.GetCommands()) == 1 && policyUpdateCommands.GetCommands()[0].GetCommand() == "ContinuousProfilingPolicyQuery" {
for _, arg := range policyUpdateCommands.GetCommands()[0].GetArgs() {
if arg.GetKey() == "ServiceWithPolicyJSON" {
policyJSON = arg.GetValue()
if policyJSON == "" {
return nil, fmt.Errorf("the query policy response not adapt")
updates := make([]*QueryPolicyUpdate, 0)
err = json.Unmarshal([]byte(policyJSON), &updates)
if err != nil {
return nil, fmt.Errorf("error to unmarshal the policy updates: %v", err)
result := make(map[string]*base.ServicePolicy)
for _, update := range updates {
servicePolicy := &base.ServicePolicy{
Service: update.ServiceName,
UUID: update.UUID,
for profilingType, checks := range update.Profiling {
policy := &base.Policy{
TargetProfilingType: profilingType,
Items: make(map[base.CheckType]*base.PolicyItem),
ServicePolicy: servicePolicy,
for checkType, item := range checks {
if err := item.Validate(); err != nil {
log.Warnf("cannot add the policy item, service name: %s, profiling type: %s, policy type: %s, error: %v",
update.ServiceName, profilingType, checkType, err)
policy.Items[checkType] = &base.PolicyItem{
Threshold: item.Threshold,
Period: item.Period,
Count: item.Count,
URIList: item.URIList,
URIRegex: item.URIRegex,
Policy: policy,
servicePolicy.Policies = append(servicePolicy.Policies, policy)
result[update.ServiceName] = servicePolicy
return result, nil
type QueryPolicyUpdate struct {
ServiceName string `json:"ServiceName"`
UUID string `json:"UUID"`
Profiling map[base.TargetProfilingType]map[base.CheckType]*QueryPolicyUpdateItem
type QueryPolicyUpdateItem struct {
Threshold string `json:"Threshold"`
Period int `json:"Period"`
Count int `json:"Count"`
URIList []string `json:"URIList"`
URIRegex string `json:"URIRegex"`
func (p *QueryPolicyUpdateItem) Validate() error {
if p.Threshold == "" {
return fmt.Errorf("thrshold cannot be empty")
if p.Period <= 0 {
return fmt.Errorf("period cannot smaller or equals zero")
if p.Count <= 0 {
return fmt.Errorf("count cannot smaller or equals zero")
if p.Count > p.Period {
return fmt.Errorf("count cannot be bigger than period")
return nil