[navi] Add init mesh network config (#786)
diff --git a/navigator/pkg/bootstrap/mesh.go b/navigator/pkg/bootstrap/mesh.go
index 666f4e4..8c4f865 100644
--- a/navigator/pkg/bootstrap/mesh.go
+++ b/navigator/pkg/bootstrap/mesh.go
@@ -44,6 +44,25 @@
klog.Infof("flags: \n%s", argsdump)
}
+func (s *Server) initMeshNetworks(args *NaviArgs, fileWatcher filewatcher.FileWatcher) {
+ klog.Infof("initializing mesh networks configuration %v", args.NetworksConfigFile)
+ col := s.getMeshNetworks(args, fileWatcher)
+ col.AsCollection().WaitUntilSynced(s.internalStop)
+ s.environment.NetworksWatcher = meshwatcher.NetworksAdapter(col)
+ klog.Infof("mesh networks configuration: %s", meshwatcher.PrettyFormatOfMeshNetworks(s.environment.MeshNetworks()))
+}
+
+func (s *Server) getMeshNetworks(args *NaviArgs, fileWatcher filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshNetworksResource] {
+ // We need to get mesh networks up-front, before we start anything, so we use internalStop rather than scheduling a task to run
+ // later.
+ opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
+ sources := s.getConfigurationSources(args, fileWatcher, args.NetworksConfigFile, kubemesh.MeshNetworksKey)
+ if len(sources) == 0 {
+ klog.Infof("Using default mesh networks - missing file %s and no k8s client", args.NetworksConfigFile)
+ }
+ return meshwatcher.NewNetworksCollection(opts, sources...)
+}
+
func (s *Server) getMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshConfigResource] {
opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
sources := s.getConfigurationSources(args, fileWatcher, args.MeshConfigFile, kubemesh.MeshConfigKey)
diff --git a/navigator/pkg/bootstrap/options.go b/navigator/pkg/bootstrap/options.go
index fd3d21a..b464899 100644
--- a/navigator/pkg/bootstrap/options.go
+++ b/navigator/pkg/bootstrap/options.go
@@ -40,14 +40,15 @@
}
type NaviArgs struct {
- ServerOptions DiscoveryServerOptions
- RegistryOptions RegistryOptions
- MeshConfigFile string
- PodName string
- Namespace string
- CtrlZOptions *ctrlz.Options
- KeepaliveOptions *keepalive.Options
- KrtDebugger *krt.DebugHandler `json:"-"`
+ ServerOptions DiscoveryServerOptions
+ RegistryOptions RegistryOptions
+ MeshConfigFile string
+ NetworksConfigFile string
+ PodName string
+ Namespace string
+ CtrlZOptions *ctrlz.Options
+ KeepaliveOptions *keepalive.Options
+ KrtDebugger *krt.DebugHandler `json:"-"`
}
type DiscoveryServerOptions struct {
diff --git a/navigator/pkg/bootstrap/server.go b/navigator/pkg/bootstrap/server.go
index 3882b72..a92e519 100644
--- a/navigator/pkg/bootstrap/server.go
+++ b/navigator/pkg/bootstrap/server.go
@@ -103,6 +103,8 @@
s.kubeClient = kubelib.SetObjectFilter(s.kubeClient, filter)
}
+ s.initMeshNetworks(args, s.fileWatcher)
+
return s, nil
}
diff --git a/navigator/pkg/model/context.go b/navigator/pkg/model/context.go
index 0c640a1..ed2c9e3 100644
--- a/navigator/pkg/model/context.go
+++ b/navigator/pkg/model/context.go
@@ -33,9 +33,10 @@
type Environment struct {
Watcher
- mutex sync.RWMutex
- pushContext *PushContext
- Cache XdsCache
+ mutex sync.RWMutex
+ pushContext *PushContext
+ Cache XdsCache
+ NetworksWatcher mesh.NetworksWatcher
}
type XdsCacheImpl struct {
@@ -78,6 +79,13 @@
return nil
}
+func (e *Environment) MeshNetworks() *meshconfig.MeshNetworks {
+ if e != nil && e.NetworksWatcher != nil {
+ return e.NetworksWatcher.Networks()
+ }
+ return nil
+}
+
func (e *Environment) AddMeshHandler(h func()) {
if e != nil && e.Watcher != nil {
e.Watcher.AddMeshHandler(h)
diff --git a/pkg/config/mesh/kubemesh/watcher.go b/pkg/config/mesh/kubemesh/watcher.go
index bbc4305..d1e94ef 100644
--- a/pkg/config/mesh/kubemesh/watcher.go
+++ b/pkg/config/mesh/kubemesh/watcher.go
@@ -31,7 +31,8 @@
)
const (
- MeshConfigKey = "mesh"
+ MeshConfigKey = "mesh"
+ MeshNetworksKey = "meshNetworks"
)
// NewConfigMapSource builds a MeshConfigSource reading from ConfigMap "name" with key "key".
diff --git a/pkg/config/mesh/mesh.go b/pkg/config/mesh/mesh.go
index 23438ee..14b8113 100644
--- a/pkg/config/mesh/mesh.go
+++ b/pkg/config/mesh/mesh.go
@@ -20,6 +20,7 @@
import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/pkg/util/pointer"
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
@@ -33,6 +34,12 @@
"time"
)
+// DefaultMeshNetworks returns a default meshnetworks configuration.
+// By default, it is empty.
+func DefaultMeshNetworks() *meshconfig.MeshNetworks {
+ return ptr.Of(EmptyMeshNetworks())
+}
+
func DefaultProxyConfig() *meshconfig.ProxyConfig {
return &meshconfig.ProxyConfig{
ConfigPath: constants.ConfigPathDir,
@@ -118,6 +125,27 @@
return defaultConfig, nil
}
+// EmptyMeshNetworks configuration with no networks
+func EmptyMeshNetworks() meshconfig.MeshNetworks {
+ return meshconfig.MeshNetworks{
+ Networks: map[string]*meshconfig.Network{},
+ }
+}
+
+// ParseMeshNetworks returns a new MeshNetworks decoded from the
+// input YAML.
+func ParseMeshNetworks(yaml string) (*meshconfig.MeshNetworks, error) {
+ out := EmptyMeshNetworks()
+ if err := protomarshal.ApplyYAML(yaml, &out); err != nil {
+ return nil, multierror.Prefix(err, "failed to convert to proto.")
+ }
+
+ // if err := agent.ValidateMeshNetworks(&out); err != nil {
+ // return nil, err
+ // }
+ return &out, nil
+}
+
func DefaultMeshConfig() *meshconfig.MeshConfig {
proxyConfig := DefaultProxyConfig()
return &meshconfig.MeshConfig{
diff --git a/pkg/config/mesh/meshwatcher/collection.go b/pkg/config/mesh/meshwatcher/collection.go
index 8b1f1a5..35780e8 100644
--- a/pkg/config/mesh/meshwatcher/collection.go
+++ b/pkg/config/mesh/meshwatcher/collection.go
@@ -69,3 +69,28 @@
}, opts.WithName("MeshConfig")...,
)
}
+
+// NewNetworksCollection builds a new meshnetworks config built by applying the provided sources.
+// Sources are applied in order (example: default < sources[0] < sources[1]).
+func NewNetworksCollection(opts krt.OptionsBuilder, sources ...MeshConfigSource) krt.Singleton[MeshNetworksResource] {
+ if len(sources) > 2 {
+ // There is no real reason for this other than to enforce we don't accidentally put more sources
+ panic("currently only 2 sources are supported")
+ }
+ return krt.NewSingleton[MeshNetworksResource](
+ func(ctx krt.HandlerContext) *MeshNetworksResource {
+ for _, attempt := range sources {
+ if s := krt.FetchOne(ctx, attempt.AsCollection()); s != nil {
+ n, err := mesh.ParseMeshNetworks(*s)
+ if err != nil {
+ klog.Errorf("invalid mesh networks, using last known state: %v", err)
+ ctx.DiscardResult()
+ return &MeshNetworksResource{mesh.DefaultMeshNetworks()}
+ }
+ return &MeshNetworksResource{n}
+ }
+ }
+ return &MeshNetworksResource{nil}
+ }, opts.WithName("MeshNetworks")...,
+ )
+}
diff --git a/pkg/config/mesh/meshwatcher/mesh.go b/pkg/config/mesh/meshwatcher/mesh.go
index a1cb200..c311ed9 100644
--- a/pkg/config/mesh/meshwatcher/mesh.go
+++ b/pkg/config/mesh/meshwatcher/mesh.go
@@ -73,7 +73,28 @@
return proto.Equal(m.MeshConfig, other.MeshConfig)
}
+// MeshNetworksResource holds the current MeshNetworks state
+type MeshNetworksResource struct {
+ *meshconfig.MeshNetworks
+}
+
+func (m MeshNetworksResource) ResourceName() string { return "MeshNetworksResource" }
+
+func (m MeshNetworksResource) Equals(other MeshNetworksResource) bool {
+ return proto.Equal(m.MeshNetworks, other.MeshNetworks)
+}
+
+// NetworksAdapter wraps a MeshNetworks collection into a mesh.NetworksWatcher interface.
+func NetworksAdapter(configuration krt.Singleton[MeshNetworksResource]) mesh.NetworksWatcher {
+ return networksAdapter{configuration}
+}
+
func PrettyFormatOfMeshConfig(meshConfig *meshconfig.MeshConfig) string {
meshConfigDump, _ := protomarshal.ToYAML(meshConfig)
return meshConfigDump
}
+
+func PrettyFormatOfMeshNetworks(meshNetworks *meshconfig.MeshNetworks) string {
+ meshNetworksDump, _ := protomarshal.ToYAML(meshNetworks)
+ return meshNetworksDump
+}
diff --git a/pkg/config/mesh/meshwatcher/networks.go b/pkg/config/mesh/meshwatcher/networks.go
new file mode 100644
index 0000000..6362e60
--- /dev/null
+++ b/pkg/config/mesh/meshwatcher/networks.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package meshwatcher
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+)
+
+type networksAdapter struct {
+ krt.Singleton[MeshNetworksResource]
+}
+
+func (n networksAdapter) Networks() *meshconfig.MeshNetworks {
+ v := n.Singleton.Get()
+ return v.MeshNetworks
+}
+
+// AddNetworksHandler registers a callback handler for changes to the networks config.
+func (n networksAdapter) AddNetworksHandler(h func()) *mesh.WatcherHandlerRegistration {
+ colReg := n.Singleton.AsCollection().RegisterBatch(func(o []krt.Event[MeshNetworksResource]) {
+ h()
+ }, false)
+
+ reg := mesh.NewWatcherHandlerRegistration(func() {
+ colReg.UnregisterHandler()
+ })
+ return reg
+}
+
+func (n networksAdapter) DeleteNetworksHandler(registration *mesh.WatcherHandlerRegistration) {
+ registration.Remove()
+}
+
+var _ mesh.NetworksWatcher = networksAdapter{}
diff --git a/pkg/config/mesh/watchers.go b/pkg/config/mesh/watchers.go
index 45bbcab..38ae18e 100644
--- a/pkg/config/mesh/watchers.go
+++ b/pkg/config/mesh/watchers.go
@@ -21,6 +21,17 @@
"istio.io/api/mesh/v1alpha1"
)
+// NetworksWatcher watches changes to the mesh networks config.
+type NetworksWatcher interface {
+ Networks() *v1alpha1.MeshNetworks
+
+ // AddNetworksHandler registers a callback handler for changes to the networks config.
+ AddNetworksHandler(func()) *WatcherHandlerRegistration
+
+ // DeleteNetworksHandler unregisters a callback handler when remote cluster is removed.
+ DeleteNetworksHandler(registration *WatcherHandlerRegistration)
+}
+
// Holder of a mesh configuration.
type Holder interface {
Mesh() *v1alpha1.MeshConfig
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index b4a2d66..e9baa49 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package namespace
import (