blob: 4104014cbf25d83b1e4d0e1f4790da6e0bc4ab57 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"net"
)
import (
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
import (
configaggregate "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/aggregate"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
controllermemory "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/serviceentry"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
)
// Server represents the XDS serving feature of Istiod (pilot).
// Unlike bootstrap/, this packet has no dependencies on K8S, CA,
// and other features. It'll be used initially in the istio-agent,
// to provide a minimal proxy while reusing the same code as istiod.
// Portions of the code will also be used in istiod - after it becomes
// stable the plan is to refactor bootstrap to use this code instead
// of directly bootstrapping XDS.
//
// The server support proxy/federation of multiple sources - last part
// or parity with MCP/Galley and MCP-over-XDS.
type SimpleServer struct {
// DiscoveryServer is the gRPC XDS implementation
// Env and MemRegistry are available as fields, as well as the default
// PushContext.
DiscoveryServer *DiscoveryServer
// MemoryStore is an in-memory config store, part of the aggregate store
// used by the discovery server.
MemoryConfigStore model.ConfigStore
// GRPCListener is the listener used for GRPC. For agent it is
// an insecure port, bound to 127.0.0.1
GRPCListener net.Listener
// syncCh is used for detecting if the stores have synced,
// which needs to happen before serving requests.
syncCh chan string
ConfigStoreCache model.ConfigStoreController
}
// Creates an basic, functional discovery server, using the same code as Istiod, but
// backed by an in-memory config and endpoint stores.
//
// Can be used in tests, or as a minimal XDS discovery server with no dependency on K8S or
// the complex bootstrap used by Istiod. A memory registry and memory config store are used to
// generate the configs - they can be programmatically updated.
func NewXDS(stop chan struct{}) *SimpleServer {
// Prepare a working XDS server, with aggregate config and registry stores and a memory store for each.
// TODO: refactor bootstrap code to use this server, and add more registries.
env := &model.Environment{
PushContext: model.NewPushContext(),
}
env.Watcher = mesh.NewFixedWatcher(mesh.DefaultMeshConfig())
env.PushContext.Mesh = env.Watcher.Mesh()
env.Init()
ds := NewDiscoveryServer(env, "istiod", map[string]string{})
ds.InitGenerators(env, "dubbo-system")
ds.CachesSynced()
// Config will have a fixed format:
// - aggregate store
// - one primary (local) memory config
// Additional stores can be added dynamically - for example by push or reference from a server.
// This is used to implement and test XDS federation (which is not yet final).
// In-memory config store, controller and istioConfigStore
schemas := collections.Pilot
store := memory.Make(schemas)
s := &SimpleServer{
DiscoveryServer: ds,
}
s.syncCh = make(chan string, len(schemas.All()))
configController := memory.NewController(store)
s.MemoryConfigStore = model.MakeIstioStore(configController)
// Endpoints/Clusters - using the config store for ServiceEntries
serviceControllers := aggregate.NewController(aggregate.Options{})
serviceEntryController := serviceentry.NewController(configController, s.MemoryConfigStore, ds)
serviceControllers.AddRegistry(serviceEntryController)
sd := controllermemory.NewServiceDiscovery()
sd.EDSUpdater = ds
ds.MemRegistry = sd
serviceControllers.AddRegistry(serviceregistry.Simple{
ProviderID: "Mem",
ServiceDiscovery: sd,
Controller: sd.Controller,
})
env.ServiceDiscovery = serviceControllers
go configController.Run(stop)
// configStoreCache - with HasSync interface
aggregateConfigController, err := configaggregate.MakeCache([]model.ConfigStoreController{
configController,
})
if err != nil {
log.Fatalf("Creating aggregate config: %v", err)
}
// TODO: fix the mess of store interfaces - most are too generic for their own good.
s.ConfigStoreCache = aggregateConfigController
env.ConfigStore = model.MakeIstioStore(aggregateConfigController)
return s
}
func (s *SimpleServer) StartGRPC(addr string) (string, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {
return "", err
}
gs := grpc.NewServer()
s.DiscoveryServer.Register(gs)
reflection.Register(gs)
s.GRPCListener = lis
go func() {
err = gs.Serve(lis)
if err != nil {
log.Infof("Serve done with %v", err)
}
}()
return lis.Addr().String(), nil
}