blob: 0b62ca3767ecdbd975e473a23a49a33f1bff66df [file] [log] [blame]
/*
Copyright Istio Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package incluster
import (
"fmt"
"strings"
"time"
)
import (
v1alpha12 "istio.io/api/analysis/v1alpha1"
"istio.io/api/meta/v1alpha1"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/kube/crdclient"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/status"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/analyzers"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/diag"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/local"
"github.com/apache/dubbo-go-pixiu/pkg/config/resource"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
)
// Controller manages repeatedly running analyzers in istiod, and reporting results
// via istio status fields.
type Controller struct {
analyzer *local.IstiodAnalyzer
statusctl *status.Controller
}
func NewController(stop <-chan struct{}, rwConfigStore model.ConfigStoreController,
kubeClient kube.Client, namespace string, statusManager *status.Manager, domainSuffix string) (*Controller, error) {
ia := local.NewIstiodAnalyzer(analyzers.AllCombined(),
"", resource.Namespace(namespace), func(name collection.Name) {}, true)
ia.AddSource(rwConfigStore)
// Filter out configs watched by rwConfigStore so we don't watch multiple times
store, err := crdclient.NewForSchemas(kubeClient, "default",
domainSuffix, collections.All.Remove(rwConfigStore.Schemas().All()...))
if err != nil {
return nil, fmt.Errorf("unable to load common types for analysis, releasing lease: %v", err)
}
ia.AddSource(store)
kubeClient.RunAndWait(stop)
err = ia.Init(stop)
if err != nil {
return nil, fmt.Errorf("unable to initialize analysis controller, releasing lease: %s", err)
}
ctl := statusManager.CreateIstioStatusController(func(status *v1alpha1.IstioStatus, context interface{}) *v1alpha1.IstioStatus {
msgs := context.(diag.Messages)
// zero out analysis messages, as this is the sole controller for those
status.ValidationMessages = []*v1alpha12.AnalysisMessageBase{}
for _, msg := range msgs {
status.ValidationMessages = append(status.ValidationMessages, msg.AnalysisMessageBase())
}
return status
})
return &Controller{analyzer: ia, statusctl: ctl}, nil
}
// Run is blocking
func (c *Controller) Run(stop <-chan struct{}) {
t := time.NewTicker(features.AnalysisInterval)
oldmsgs := diag.Messages{}
for {
select {
case <-t.C:
res, err := c.analyzer.ReAnalyze(stop)
if err != nil {
log.Errorf("In-cluster analysis has failed: %s", err)
continue
}
// reorganize messages to map
index := map[status.Resource]diag.Messages{}
for _, m := range res.Messages {
key := status.ResourceFromMetadata(m.Resource.Metadata)
index[key] = append(index[key], m)
}
// if we previously had a message that has been removed, ensure it is removed
// TODO: this creates a state destruction problem when istiod crashes
// in that old messages may not be removed. Not sure how to fix this
// other than write every object's status every loop.
for _, m := range oldmsgs {
key := status.ResourceFromMetadata(m.Resource.Metadata)
if _, ok := index[key]; !ok {
index[key] = diag.Messages{}
}
}
for r, m := range index {
// don't try to write status for non-istio types
if strings.HasSuffix(r.Group, "istio.io") {
log.Debugf("enqueueing update for %s/%s", r.Namespace, r.Name)
c.statusctl.EnqueueStatusUpdateResource(m, r)
}
}
oldmsgs = res.Messages
log.Debugf("finished enqueueing all statuses")
case <-stop:
t.Stop()
break
}
}
}