blob: 1971ba84c094e06f1aa106f8afc5108bde76435d [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"time"
)
import (
meshconfig "istio.io/api/mesh/v1alpha1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/test"
)
const (
defaultFakeDomainSuffix = "company.com"
)
// FakeXdsUpdater is used to test the registry.
type FakeXdsUpdater struct {
// Events tracks notifications received by the updater
Events chan FakeXdsEvent
}
var _ model.XDSUpdater = &FakeXdsUpdater{}
func (fx *FakeXdsUpdater) ConfigUpdate(req *model.PushRequest) {
var id string
if req != nil && len(req.ConfigsUpdated) > 0 {
for key := range req.ConfigsUpdated {
id = key.Name
}
}
select {
case fx.Events <- FakeXdsEvent{Type: "xds", ID: id}:
default:
}
}
func (fx *FakeXdsUpdater) ProxyUpdate(_ cluster.ID, _ string) {
select {
case fx.Events <- FakeXdsEvent{Type: "proxy"}:
default:
}
}
// FakeXdsEvent is used to watch XdsEvents
type FakeXdsEvent struct {
// Type of the event
Type string
// The id of the event
ID string
// The endpoints associated with an EDS push if any
Endpoints []*model.IstioEndpoint
}
// NewFakeXDS creates a XdsUpdater reporting events via a channel.
func NewFakeXDS() *FakeXdsUpdater {
return &FakeXdsUpdater{
Events: make(chan FakeXdsEvent, 100),
}
}
func (fx *FakeXdsUpdater) EDSUpdate(_ model.ShardKey, hostname string, _ string, entry []*model.IstioEndpoint) {
if len(entry) > 0 {
select {
case fx.Events <- FakeXdsEvent{Type: "eds", ID: hostname, Endpoints: entry}:
default:
}
}
}
func (fx *FakeXdsUpdater) EDSCacheUpdate(_ model.ShardKey, hostname, _ string, entry []*model.IstioEndpoint) {
if len(entry) > 0 {
select {
case fx.Events <- FakeXdsEvent{Type: "eds cache", ID: hostname, Endpoints: entry}:
default:
}
}
}
// SvcUpdate is called when a service port mapping definition is updated.
// This interface is WIP - labels, annotations and other changes to service may be
// updated to force a EDS and CDS recomputation and incremental push, as it doesn't affect
// LDS/RDS.
func (fx *FakeXdsUpdater) SvcUpdate(_ model.ShardKey, hostname string, _ string, _ model.Event) {
select {
case fx.Events <- FakeXdsEvent{Type: "service", ID: hostname}:
default:
}
}
func (fx *FakeXdsUpdater) RemoveShard(shardKey model.ShardKey) {
select {
case fx.Events <- FakeXdsEvent{Type: "removeShard", ID: shardKey.String()}:
default:
}
}
func (fx *FakeXdsUpdater) WaitOrFail(t test.Failer, et string) *FakeXdsEvent {
return fx.WaitForDurationOrFail(t, et, 5*time.Second)
}
func (fx *FakeXdsUpdater) WaitForDurationOrFail(t test.Failer, et string, d time.Duration) *FakeXdsEvent {
ev := fx.WaitForDuration(et, d)
if ev == nil {
t.Fatalf("Timeout creating %q after %s", et, d)
}
return ev
}
func (fx *FakeXdsUpdater) Wait(et string) *FakeXdsEvent {
return fx.WaitForDuration(et, 5*time.Second)
}
func (fx *FakeXdsUpdater) WaitForDuration(et string, d time.Duration) *FakeXdsEvent {
for {
select {
case e := <-fx.Events:
if e.Type == et {
return &e
}
continue
case <-time.After(d):
return nil
}
}
}
// Clear any pending event
func (fx *FakeXdsUpdater) Clear() {
wait := true
for wait {
select {
case <-fx.Events:
default:
wait = false
}
}
}
type FakeControllerOptions struct {
Client kubelib.Client
NetworksWatcher mesh.NetworksWatcher
MeshWatcher mesh.Watcher
ServiceHandler func(service *model.Service, event model.Event)
Mode EndpointMode
ClusterID cluster.ID
WatchedNamespaces string
DomainSuffix string
XDSUpdater model.XDSUpdater
DiscoveryNamespacesFilter filter.DiscoveryNamespacesFilter
Stop chan struct{}
}
type FakeController struct {
*Controller
}
func NewFakeControllerWithOptions(opts FakeControllerOptions) (*FakeController, *FakeXdsUpdater) {
xdsUpdater := opts.XDSUpdater
if xdsUpdater == nil {
xdsUpdater = NewFakeXDS()
}
domainSuffix := defaultFakeDomainSuffix
if opts.DomainSuffix != "" {
domainSuffix = opts.DomainSuffix
}
if opts.Client == nil {
opts.Client = kubelib.NewFakeClient()
}
if opts.MeshWatcher == nil {
opts.MeshWatcher = mesh.NewFixedWatcher(&meshconfig.MeshConfig{})
}
meshServiceController := aggregate.NewController(aggregate.Options{MeshHolder: opts.MeshWatcher})
options := Options{
DomainSuffix: domainSuffix,
XDSUpdater: xdsUpdater,
Metrics: &model.Environment{},
NetworksWatcher: opts.NetworksWatcher,
MeshWatcher: opts.MeshWatcher,
EndpointMode: opts.Mode,
ClusterID: opts.ClusterID,
DiscoveryNamespacesFilter: opts.DiscoveryNamespacesFilter,
MeshServiceController: meshServiceController,
}
c := NewController(opts.Client, options)
meshServiceController.AddRegistry(c)
if opts.ServiceHandler != nil {
c.AppendServiceHandler(opts.ServiceHandler)
}
c.stop = opts.Stop
if c.stop == nil {
c.stop = make(chan struct{})
}
opts.Client.RunAndWait(c.stop)
var fx *FakeXdsUpdater
if x, ok := xdsUpdater.(*FakeXdsUpdater); ok {
fx = x
}
return &FakeController{c}, fx
}