blob: 81d0811e1af2358ef9a78776b49aa34dfd74a5aa [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubemesh
import (
"fmt"
)
import (
meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/pkg/log"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/configmapwatcher"
)
// NewConfigMapWatcher creates a new Watcher for changes to the given ConfigMap.
func NewConfigMapWatcher(client kube.Client, namespace, name, key string, multiWatch bool, stop <-chan struct{}) *mesh.MultiWatcher {
w := mesh.NewMultiWatcher(mesh.DefaultMeshConfig())
c := configmapwatcher.NewController(client, namespace, name, func(cm *v1.ConfigMap) {
meshNetworks, err := ReadNetworksConfigMap(cm, "meshNetworks")
if err != nil {
// Keep the last known config in case there's a misconfiguration issue.
log.Warnf("failed to read mesh config from ConfigMap: %v", err)
return
}
if meshNetworks != nil {
w.SetNetworks(meshNetworks)
}
if multiWatch {
meshConfig := meshConfigMapData(cm, key)
w.HandleMeshConfigData(meshConfig)
return
}
// Original behavior - just per-revision config
meshConfig, err := ReadConfigMap(cm, key)
if err != nil {
// Keep the last known config in case there's a misconfiguration issue.
log.Warnf("failed to read mesh config from ConfigMap: %v", err)
return
}
w.HandleMeshConfig(meshConfig)
})
go c.Run(stop)
// Ensure the ConfigMap is initially loaded if present.
if !cache.WaitForCacheSync(stop, c.HasSynced) {
log.Error("failed to wait for cache sync")
}
return w
}
func AddUserMeshConfig(client kube.Client, watcher mesh.Watcher, namespace, key, userMeshConfig string, stop chan struct{}) {
c := configmapwatcher.NewController(client, namespace, userMeshConfig, func(cm *v1.ConfigMap) {
meshConfig := meshConfigMapData(cm, key)
watcher.HandleUserMeshConfig(meshConfig)
})
go c.Run(stop)
if !cache.WaitForCacheSync(stop, c.HasSynced) {
log.Error("failed to wait for cache sync")
}
}
func meshConfigMapData(cm *v1.ConfigMap, key string) string {
if cm == nil {
return ""
}
cfgYaml, exists := cm.Data[key]
if !exists {
return ""
}
return cfgYaml
}
func ReadConfigMap(cm *v1.ConfigMap, key string) (*meshconfig.MeshConfig, error) {
if cm == nil {
log.Info("no ConfigMap found, using default MeshConfig config")
return mesh.DefaultMeshConfig(), nil
}
cfgYaml, exists := cm.Data[key]
if !exists {
return nil, fmt.Errorf("missing ConfigMap key %q", key)
}
meshConfig, err := mesh.ApplyMeshConfigDefaults(cfgYaml)
if err != nil {
return nil, fmt.Errorf("failed reading MeshConfig config: %v. YAML:\n%s", err, cfgYaml)
}
log.Info("Loaded MeshConfig config from Kubernetes API server.")
return meshConfig, nil
}
func ReadNetworksConfigMap(cm *v1.ConfigMap, key string) (*meshconfig.MeshNetworks, error) {
if cm == nil {
log.Info("no ConfigMap found, using existing MeshNetworks config")
return nil, nil
}
cfgYaml, exists := cm.Data[key]
if !exists {
return nil, nil
}
meshNetworks, err := mesh.ParseMeshNetworks(cfgYaml)
if err != nil {
return nil, fmt.Errorf("failed reading MeshNetworks config: %v. YAML:\n%s", err, cfgYaml)
}
log.Info("Loaded MeshNetworks config from Kubernetes API server.")
return meshNetworks, nil
}