blob: c065250b9d51ac636f6a284c6f2c19a67db7f672 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"context"
"fmt"
"io"
"os"
"strings"
"time"
)
import (
"github.com/hashicorp/go-multierror"
"github.com/ryanuber/go-glob"
"istio.io/api/annotation"
"istio.io/api/mesh/v1alpha1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/aggregate"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/file"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/kube/crdclient"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/diag"
"github.com/apache/dubbo-go-pixiu/pkg/config/analysis/scope"
mesh_const "github.com/apache/dubbo-go-pixiu/pkg/config/legacy/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/legacy/util/kuberesource"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/resource"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
// IstiodAnalyzer handles local analysis of k8s event sources, both live and file-based
type IstiodAnalyzer struct {
// internalStore stores synthetic configs for analysis (mesh config, etc)
internalStore model.ConfigStore
// stores contains all the (non file) config sources to analyze
stores []model.ConfigStoreController
// fileSource contains all file bases sources
fileSource *file.KubeSource
analyzer *analysis.CombinedAnalyzer
namespace resource.Namespace
istioNamespace resource.Namespace
initializedStore model.ConfigStoreController
// List of code and resource suppressions to exclude messages on
suppressions []AnalysisSuppression
// Mesh config for this analyzer. This can come from multiple sources, and the last added version will take precedence.
meshCfg *v1alpha1.MeshConfig
// Mesh networks config for this analyzer.
meshNetworks *v1alpha1.MeshNetworks
// Which kube resources are used by this analyzer
// Derived from metadata and the specified analyzer and transformer providers
kubeResources collection.Schemas
// Hook function called when a collection is used in analysis
collectionReporter CollectionReporterFn
clientsToRun []kubelib.Client
}
// NewSourceAnalyzer is a drop-in replacement for the galley function, adapting to istiod analyzer.
func NewSourceAnalyzer(analyzer *analysis.CombinedAnalyzer, namespace, istioNamespace resource.Namespace,
cr CollectionReporterFn, serviceDiscovery bool, _ time.Duration) *IstiodAnalyzer {
return NewIstiodAnalyzer(analyzer, namespace, istioNamespace, cr, serviceDiscovery)
}
// NewIstiodAnalyzer creates a new IstiodAnalyzer with no sources. Use the Add*Source
// methods to add sources in ascending precedence order,
// then execute Analyze to perform the analysis
func NewIstiodAnalyzer(analyzer *analysis.CombinedAnalyzer, namespace,
istioNamespace resource.Namespace, cr CollectionReporterFn, serviceDiscovery bool) *IstiodAnalyzer {
// collectionReporter hook function defaults to no-op
if cr == nil {
cr = func(collection.Name) {}
}
// Get the closure of all input collections for our analyzer, paying attention to transforms
kubeResources := kuberesource.SkipExcludedCollections(
analyzer.Metadata().Inputs,
kuberesource.DefaultExcludedResourceKinds(),
serviceDiscovery)
mcfg := mesh.DefaultMeshConfig()
sa := &IstiodAnalyzer{
meshCfg: mcfg,
meshNetworks: mesh.DefaultMeshNetworks(),
analyzer: analyzer,
namespace: namespace,
internalStore: memory.Make(collection.SchemasFor(collections.IstioMeshV1Alpha1MeshNetworks, collections.IstioMeshV1Alpha1MeshConfig)),
istioNamespace: istioNamespace,
kubeResources: kubeResources,
collectionReporter: cr,
}
return sa
}
// ReAnalyze loads the sources and executes the analysis, assuming init is already called
func (sa *IstiodAnalyzer) ReAnalyze(cancel <-chan struct{}) (AnalysisResult, error) {
var result AnalysisResult
store := sa.initializedStore
result.ExecutedAnalyzers = sa.analyzer.AnalyzerNames()
result.SkippedAnalyzers = sa.analyzer.RemoveSkipped(store.Schemas())
cache.WaitForCacheSync(cancel,
store.HasSynced)
ctx := NewContext(store, cancel, sa.collectionReporter)
sa.analyzer.Analyze(ctx)
namespaces := make(map[resource.Namespace]struct{})
if sa.namespace != "" {
namespaces[sa.namespace] = struct{}{}
}
// TODO: analysis is run for all namespaces, even if they are requested to be filtered.
msgs := filterMessages(ctx.(*istiodContext).messages, namespaces, sa.suppressions)
result.Messages = msgs.SortedDedupedCopy()
return result, nil
}
// Analyze loads the sources and executes the analysis
func (sa *IstiodAnalyzer) Analyze(cancel <-chan struct{}) (AnalysisResult, error) {
err2 := sa.Init(cancel)
if err2 != nil {
return AnalysisResult{}, err2
}
return sa.ReAnalyze(cancel)
}
func (sa *IstiodAnalyzer) Init(cancel <-chan struct{}) error {
// We need at least one non-meshcfg source
if len(sa.stores) == 0 && sa.fileSource == nil {
return fmt.Errorf("at least one file and/or Kubernetes source must be provided")
}
// TODO: there's gotta be a better way to convert v1meshconfig to config.Config...
// Create a store containing mesh config. There should be exactly one.
_, err := sa.internalStore.Create(config.Config{
Meta: config.Meta{
Name: mesh_const.MeshConfigResourceName.Name.String(),
Namespace: mesh_const.MeshConfigResourceName.Namespace.String(),
GroupVersionKind: collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind(),
},
Spec: sa.meshCfg,
})
if err != nil {
return fmt.Errorf("something unexpected happened while creating the meshconfig: %s", err)
}
// Create a store containing meshnetworks. There should be exactly one.
_, err = sa.internalStore.Create(config.Config{
Meta: config.Meta{
Name: mesh_const.MeshNetworksResourceName.Name.String(),
Namespace: mesh_const.MeshNetworksResourceName.Namespace.String(),
GroupVersionKind: collections.IstioMeshV1Alpha1MeshNetworks.Resource().GroupVersionKind(),
},
Spec: sa.meshNetworks,
})
if err != nil {
return fmt.Errorf("something unexpected happened while creating the meshnetworks: %s", err)
}
allstores := append(sa.stores, dfCache{ConfigStore: sa.internalStore})
if sa.fileSource != nil {
allstores = append(allstores, sa.fileSource)
}
for _, c := range sa.clientsToRun {
// TODO: this could be parallel
c.RunAndWait(cancel)
}
store, err := aggregate.MakeWriteableCache(allstores, nil)
if err != nil {
return err
}
go store.Run(cancel)
sa.initializedStore = store
return nil
}
type dfCache struct {
model.ConfigStore
}
func (d dfCache) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
panic("implement me")
}
// Run intentionally left empty
func (d dfCache) Run(_ <-chan struct{}) {
}
func (d dfCache) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
panic("implement me")
}
func (d dfCache) HasSynced() bool {
return true
}
// SetSuppressions will set the list of suppressions for the analyzer. Any
// resource that matches the provided suppression will not be included in the
// final message output.
func (sa *IstiodAnalyzer) SetSuppressions(suppressions []AnalysisSuppression) {
sa.suppressions = suppressions
}
// AddReaderKubeSource adds a source based on the specified k8s yaml files to the current IstiodAnalyzer
func (sa *IstiodAnalyzer) AddReaderKubeSource(readers []ReaderSource) error {
var src *file.KubeSource
if sa.fileSource != nil {
src = sa.fileSource
} else {
src = file.NewKubeSource(sa.kubeResources)
sa.fileSource = src
}
src.SetDefaultNamespace(sa.namespace)
var errs error
// If we encounter any errors reading or applying files, track them but attempt to continue
for _, r := range readers {
by, err := io.ReadAll(r.Reader)
if err != nil {
errs = multierror.Append(errs, err)
continue
}
if err = src.ApplyContent(r.Name, string(by)); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}
// AddRunningKubeSource adds a source based on a running k8s cluster to the current IstiodAnalyzer
// Also tries to get mesh config from the running cluster, if it can
func (sa *IstiodAnalyzer) AddRunningKubeSource(c kubelib.Client) {
sa.AddRunningKubeSourceWithRevision(c, "default")
}
func (sa *IstiodAnalyzer) AddRunningKubeSourceWithRevision(c kubelib.Client, revision string) {
// TODO: are either of these string constants intended to vary?
// This gets us only istio/ ones
store, err := crdclient.NewForSchemas(c, revision, "cluster.local", sa.kubeResources)
// RunAndWait must be called after NewForSchema so that the informers are all created and started.
if err != nil {
scope.Analysis.Errorf("error adding kube crdclient: %v", err)
return
}
sa.stores = append(sa.stores, store)
err = store.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
// failed resources will never be synced, which causes the process to hang indefinitely.
// better to fail fast, and get a good idea for the failure.
scope.Analysis.Errorf("Failed to watch crd resource for analysis: %s", err)
})
if err != nil {
scope.Analysis.Errorf("error setting up error handling for kube crdclient: %v", err)
return
}
sa.clientsToRun = append(sa.clientsToRun, c)
// Since we're using a running k8s source, try to get meshconfig and meshnetworks from the configmap.
if err := sa.addRunningKubeIstioConfigMapSource(c); err != nil {
_, err := c.CoreV1().Namespaces().Get(context.TODO(), sa.istioNamespace.String(), metav1.GetOptions{})
if kerrors.IsNotFound(err) {
// An AnalysisMessage already show up to warn the absence of dubbo-system namespace, so making it debug level.
scope.Analysis.Debugf("%v namespace not found. Istio may not be installed in the target cluster. "+
"Using default mesh configuration values for analysis", sa.istioNamespace.String())
} else if err != nil {
scope.Analysis.Errorf("error getting mesh config from running kube source: %v", err)
}
}
}
// AddSource adds a source based on user supplied configstore to the current IstiodAnalyzer
// Assumes that the source has same or subset of resource types that this analyzer is configured with.
// This can be used by external users who import the analyzer as a module within their own controllers.
func (sa *IstiodAnalyzer) AddSource(src model.ConfigStoreController) {
sa.stores = append(sa.stores, src)
}
// AddFileKubeMeshConfig gets mesh config from the specified yaml file
func (sa *IstiodAnalyzer) AddFileKubeMeshConfig(file string) error {
by, err := os.ReadFile(file)
if err != nil {
return err
}
cfg, err := mesh.ApplyMeshConfigDefaults(string(by))
if err != nil {
return err
}
sa.meshCfg = cfg
return nil
}
// AddFileKubeMeshNetworks gets a file meshnetworks and add it to the analyzer.
func (sa *IstiodAnalyzer) AddFileKubeMeshNetworks(file string) error {
mn, err := mesh.ReadMeshNetworks(file)
if err != nil {
return err
}
sa.meshNetworks = mn
return nil
}
// AddDefaultResources adds some basic dummy Istio resources, based on mesh configuration.
// This is useful for files-only analysis cases where we don't expect the user to be including istio system resources
// and don't want to generate false positives because they aren't there.
// Respect mesh config when deciding which default resources should be generated
func (sa *IstiodAnalyzer) AddDefaultResources() error {
var readers []ReaderSource
if sa.meshCfg.GetIngressControllerMode() != v1alpha1.MeshConfig_OFF {
ingressResources, err := getDefaultIstioIngressGateway(sa.istioNamespace.String(), sa.meshCfg.GetIngressService())
if err != nil {
return err
}
readers = append(readers, ReaderSource{Reader: strings.NewReader(ingressResources), Name: "internal-ingress"})
}
if len(readers) == 0 {
return nil
}
return sa.AddReaderKubeSource(readers)
}
func (sa *IstiodAnalyzer) addRunningKubeIstioConfigMapSource(client kubelib.Client) error {
meshConfigMap, err := client.CoreV1().ConfigMaps(string(sa.istioNamespace)).Get(context.TODO(), meshConfigMapName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not read configmap %q from namespace %q: %v", meshConfigMapName, sa.istioNamespace, err)
}
configYaml, ok := meshConfigMap.Data[meshConfigMapKey]
if !ok {
return fmt.Errorf("missing config map key %q", meshConfigMapKey)
}
cfg, err := mesh.ApplyMeshConfigDefaults(configYaml)
if err != nil {
return fmt.Errorf("error parsing mesh config: %v", err)
}
sa.meshCfg = cfg
meshNetworksYaml, ok := meshConfigMap.Data[meshNetworksMapKey]
if !ok {
return fmt.Errorf("missing config map key %q", meshNetworksMapKey)
}
mn, err := mesh.ParseMeshNetworks(meshNetworksYaml)
if err != nil {
return fmt.Errorf("error parsing mesh networks: %v", err)
}
sa.meshNetworks = mn
return nil
}
// CollectionReporterFn is a hook function called whenever a collection is accessed through the AnalyzingDistributor's context
type CollectionReporterFn func(collection.Name)
// copied from processing/snapshotter/analyzingdistributor.go
func filterMessages(messages diag.Messages, namespaces map[resource.Namespace]struct{}, suppressions []AnalysisSuppression) diag.Messages {
nsNames := sets.New()
for k := range namespaces {
nsNames.Insert(k.String())
}
var msgs diag.Messages
FilterMessages:
for _, m := range messages {
// Only keep messages for resources in namespaces we want to analyze if the
// message doesn't have an origin (meaning we can't determine the
// namespace). Also kept are cluster-level resources where the namespace is
// the empty string. If no such limit is specified, keep them all.
if len(namespaces) > 0 && m.Resource != nil && m.Resource.Origin.Namespace() != "" {
if !nsNames.Contains(m.Resource.Origin.Namespace().String()) {
continue FilterMessages
}
}
// Filter out any messages on resources with suppression annotations.
if m.Resource != nil && m.Resource.Metadata.Annotations[annotation.GalleyAnalyzeSuppress.Name] != "" {
for _, code := range strings.Split(m.Resource.Metadata.Annotations[annotation.GalleyAnalyzeSuppress.Name], ",") {
if code == "*" || m.Type.Code() == code {
scope.Analysis.Debugf("Suppressing code %s on resource %s due to resource annotation", m.Type.Code(), m.Resource.Origin.FriendlyName())
continue FilterMessages
}
}
}
// Filter out any messages that match our suppressions.
for _, s := range suppressions {
if m.Resource == nil || s.Code != m.Type.Code() {
continue
}
if !glob.Glob(s.ResourceName, m.Resource.Origin.FriendlyName()) {
continue
}
scope.Analysis.Debugf("Suppressing code %s on resource %s due to suppressions list", m.Type.Code(), m.Resource.Origin.FriendlyName())
continue FilterMessages
}
msgs = append(msgs, m)
}
return msgs
}
// AnalysisSuppression describes a resource and analysis code to be suppressed
// (e.g. ignored) during analysis. Used when a particular message code is to be
// ignored for a specific resource.
type AnalysisSuppression struct {
// Code is the analysis code to suppress (e.g. "IST0104").
Code string
// ResourceName is the name of the resource to suppress the message for. For
// K8s resources it has the same form as used by istioctl (e.g.
// "DestinationRule default.dubbo-system"). Note that globbing wildcards are
// supported (e.g. "DestinationRule *.dubbo-system").
ResourceName string
}
// ReaderSource is a tuple of a io.Reader and filepath.
type ReaderSource struct {
// Name is the name of the source (commonly the path to a file, but can be "-" for sources read from stdin or "" if completely synthetic).
Name string
// Reader is the reader instance to use.
Reader io.Reader
}