blob: 6119b2fd2d16d9714f9c9358a652ee3cf1f8383c [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesh
import (
"reflect"
"sync"
"sync/atomic"
"time"
"unsafe"
)
import (
meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/pkg/filewatcher"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
)
// Holder of a mesh configuration.
type Holder interface {
Mesh() *meshconfig.MeshConfig
}
// Watcher is a Holder whose mesh config can be updated asynchronously.
type Watcher interface {
Holder
// AddMeshHandler registers a callback handler for changes to the mesh config.
AddMeshHandler(func())
// HandleUserMeshConfig keeps track of user mesh config overrides. These are merged with the standard
// mesh config, which takes precedence.
HandleUserMeshConfig(string)
}
// MultiWatcher is a struct wrapping the internal injector to let users know that both
type MultiWatcher struct {
internalWatcher
internalNetworkWatcher
}
func NewMultiWatcher(config *meshconfig.MeshConfig) *MultiWatcher {
return &MultiWatcher{
internalWatcher: internalWatcher{MeshConfig: config},
}
}
var _ Watcher = &internalWatcher{}
type internalWatcher struct {
mutex sync.Mutex
handlers []func()
// Current merged mesh config
MeshConfig *meshconfig.MeshConfig
userMeshConfig string
revMeshConfig string
}
// NewFixedWatcher creates a new Watcher that always returns the given mesh config. It will never
// fire any events, since the config never changes.
func NewFixedWatcher(mesh *meshconfig.MeshConfig) Watcher {
return &internalWatcher{
MeshConfig: mesh,
}
}
// NewFileWatcher creates a new Watcher for changes to the given mesh config file. Returns an error
// if the given file does not exist or failed during parsing.
func NewFileWatcher(fileWatcher filewatcher.FileWatcher, filename string, multiWatch bool) (Watcher, error) {
meshConfigYaml, err := ReadMeshConfigData(filename)
if err != nil {
return nil, err
}
meshConfig, err := ApplyMeshConfigDefaults(meshConfigYaml)
if err != nil {
return nil, err
}
w := &internalWatcher{
MeshConfig: meshConfig,
revMeshConfig: meshConfigYaml,
}
// Watch the config file for changes and reload if it got modified
addFileWatcher(fileWatcher, filename, func() {
if multiWatch {
meshConfig, err := ReadMeshConfigData(filename)
if err != nil {
log.Warnf("failed to read mesh configuration, using default: %v", err)
return
}
w.HandleMeshConfigData(meshConfig)
return
}
// Reload the config file
meshConfig, err = ReadMeshConfig(filename)
if err != nil {
log.Warnf("failed to read mesh configuration, using default: %v", err)
return
}
w.HandleMeshConfig(meshConfig)
})
return w, nil
}
// Mesh returns the latest mesh config.
func (w *internalWatcher) Mesh() *meshconfig.MeshConfig {
return (*meshconfig.MeshConfig)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&w.MeshConfig))))
}
// AddMeshHandler registers a callback handler for changes to the mesh config.
func (w *internalWatcher) AddMeshHandler(h func()) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.handlers = append(w.handlers, h)
}
// HandleMeshConfigData keeps track of the standard mesh config. These are merged with the user
// mesh config, but takes precedence.
func (w *internalWatcher) HandleMeshConfigData(yaml string) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.revMeshConfig = yaml
merged := w.merged()
w.handleMeshConfigInternal(merged)
}
// HandleUserMeshConfig keeps track of user mesh config overrides. These are merged with the standard
// mesh config, which takes precedence.
func (w *internalWatcher) HandleUserMeshConfig(yaml string) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.userMeshConfig = yaml
merged := w.merged()
w.handleMeshConfigInternal(merged)
}
// merged returns the merged user and revision config.
func (w *internalWatcher) merged() *meshconfig.MeshConfig {
mc := DefaultMeshConfig()
if w.userMeshConfig != "" {
mc1, err := ApplyMeshConfig(w.userMeshConfig, mc)
if err != nil {
log.Errorf("user config invalid, ignoring it %v %s", err, w.userMeshConfig)
} else {
mc = mc1
log.Infof("Applied user config: %s", PrettyFormatOfMeshConfig(mc))
}
}
if w.revMeshConfig != "" {
mc1, err := ApplyMeshConfig(w.revMeshConfig, mc)
if err != nil {
log.Errorf("revision config invalid, ignoring it %v %s", err, w.userMeshConfig)
} else {
mc = mc1
log.Infof("Applied revision mesh config: %s", PrettyFormatOfMeshConfig(mc))
}
}
return mc
}
// HandleMeshConfig calls all handlers for a given mesh configuration update. This must be called
// with a lock on w.Mutex, or updates may be applied out of order.
func (w *internalWatcher) HandleMeshConfig(meshConfig *meshconfig.MeshConfig) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.handleMeshConfigInternal(meshConfig)
}
// handleMeshConfigInternal behaves the same as HandleMeshConfig but must be called under a lock
func (w *internalWatcher) handleMeshConfigInternal(meshConfig *meshconfig.MeshConfig) {
var handlers []func()
if !reflect.DeepEqual(meshConfig, w.MeshConfig) {
log.Infof("mesh configuration updated to: %s", PrettyFormatOfMeshConfig(meshConfig))
if !reflect.DeepEqual(meshConfig.ConfigSources, w.MeshConfig.ConfigSources) {
log.Info("mesh configuration sources have changed")
// TODO Need to recreate or reload initConfigController()
}
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&w.MeshConfig)), unsafe.Pointer(meshConfig))
handlers = append(handlers, w.handlers...)
}
// TODO hack: the first handler added is the ConfigPush, other handlers affect what will be pushed, so reversing iteration
for i := len(handlers) - 1; i >= 0; i-- {
handlers[i]()
}
}
// Add to the FileWatcher the provided file and execute the provided function
// on any change event for this file.
// Using a debouncing mechanism to avoid calling the callback multiple times
// per event.
func addFileWatcher(fileWatcher filewatcher.FileWatcher, file string, callback func()) {
_ = fileWatcher.Add(file)
go func() {
var timerC <-chan time.Time
for {
select {
case <-timerC:
timerC = nil
callback()
case <-fileWatcher.Events(file):
// Use a timer to debounce configuration updates
if timerC == nil {
timerC = time.After(100 * time.Millisecond)
}
}
}
}()
}
func PrettyFormatOfMeshConfig(meshConfig *meshconfig.MeshConfig) string {
meshConfigDump, _ := protomarshal.ToJSONWithIndent(meshConfig, " ")
return meshConfigDump
}