blob: 1403407fd1f0ed25e7c1627c2d31f1f8d185ea49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"github.com/dubbo-go-pixiu/pixiu-api/pkg/api"
xdspb "github.com/dubbo-go-pixiu/pixiu-api/pkg/xds/model"
"github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config/xds/apiclient"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/server/controls"
)
type CdsManager struct {
DiscoverApi
clusterMg controls.ClusterManager
}
// Fetch overwrite DiscoverApi.Fetch.
func (c *CdsManager) Fetch() error {
r, err := c.DiscoverApi.Fetch("") //todo use local version
if err != nil {
return err
}
clusters := make([]*xdspb.Cluster, 0, len(r))
for _, one := range r {
extClusters := &xdspb.PixiuExtensionClusters{}
if err := one.To(extClusters); err != nil {
logger.Errorf("unknown resource of %s, expect Listener", one.GetName())
continue
}
logger.Infof("clusters from xds server %v", extClusters)
clusters = append(clusters, extClusters.Clusters...)
}
return c.setupCluster(clusters)
}
func (c *CdsManager) Delta() error {
readCh, err := c.DiscoverApi.Delta()
if err != nil {
return err
}
go c.asyncHandler(readCh)
return nil
}
func (c *CdsManager) asyncHandler(read chan *apiclient.DeltaResources) {
for one := range read {
clusters := make([]*xdspb.Cluster, 0, len(one.NewResources))
for _, one := range one.NewResources {
cluster := &xdspb.PixiuExtensionClusters{}
if err := one.To(cluster); err != nil {
logger.Errorf("unknown resource of %s, expect Listener", one.GetName())
continue
}
logger.Infof("clusters from xds server %v", cluster)
clusters = append(clusters, cluster.Clusters...)
}
if err := c.setupCluster(clusters); err != nil {
logger.Errorf("can not setup cluster.", err)
}
}
}
func (c *CdsManager) removeCluster(clusterNames []string) {
c.clusterMg.RemoveCluster(clusterNames)
}
func (c *CdsManager) setupCluster(clusters []*xdspb.Cluster) error {
laterApplies := make([]func() error, 0, len(clusters))
toRemoveHash := make(map[string]struct{}, len(clusters))
store, err := c.clusterMg.CloneXdsControlStore()
if err != nil {
return errors.WithMessagef(err, "can not clone cluster store when update cluster")
}
//todo this will remove the cluster which defined locally.
for _, cluster := range store.Config() {
toRemoveHash[cluster.Name] = struct{}{}
}
for _, cluster := range clusters {
delete(toRemoveHash, cluster.Name)
makeCluster := c.makeCluster(cluster)
switch {
case c.clusterMg.HasCluster(cluster.Name):
laterApplies = append(laterApplies, func() error {
c.clusterMg.UpdateCluster(makeCluster)
return nil
})
default:
laterApplies = append(laterApplies, func() error {
c.clusterMg.AddCluster(makeCluster)
return nil
})
}
}
c.removeClusters(toRemoveHash)
for _, fn := range laterApplies { //do update and add new cluster.
if err := fn(); err != nil {
logger.Errorf("can not modify cluster", err)
}
}
return nil
}
func (c *CdsManager) removeClusters(toRemoveList map[string]struct{}) {
removeClusters := make([]string, 0, len(toRemoveList))
for clusterName := range toRemoveList {
removeClusters = append(removeClusters, clusterName)
}
if len(toRemoveList) == 0 {
return
}
c.removeCluster(removeClusters)
}
func (c *CdsManager) makeCluster(cluster *xdspb.Cluster) *model.ClusterConfig {
return &model.ClusterConfig{
Name: cluster.Name,
TypeStr: cluster.TypeStr,
Type: c.makeClusterType(cluster),
EdsClusterConfig: c.makeEdsClusterConfig(cluster.EdsClusterConfig),
LbStr: c.makeLoadBalancePolicy(cluster.LbStr),
HealthChecks: c.makeHealthChecks(cluster.HealthChecks),
Endpoints: c.makeEndpoints(cluster.Endpoints),
}
}
func (c *CdsManager) makeLoadBalancePolicy(lb string) model.LbPolicyType {
return model.LbPolicyTypeValue[lb]
}
func (c *CdsManager) makeClusterType(cluster *xdspb.Cluster) model.DiscoveryType {
return model.DiscoveryTypeValue[cluster.TypeStr]
}
func (c *CdsManager) makeEndpoints(endpoints []*xdspb.Endpoint) []*model.Endpoint {
r := make([]*model.Endpoint, len(endpoints))
for i, endpoint := range endpoints {
r[i] = &model.Endpoint{
ID: endpoint.Id,
Name: endpoint.Name,
Address: c.makeAddress(endpoint),
Metadata: endpoint.Metadata,
}
}
return r
}
func (c *CdsManager) makeAddress(endpoint *xdspb.Endpoint) model.SocketAddress {
if endpoint == nil || endpoint.Address == nil {
return model.SocketAddress{}
}
return model.SocketAddress{
Address: endpoint.Address.Address,
Port: int(endpoint.Address.Port),
ResolverName: endpoint.Address.ResolverName,
Domains: endpoint.Address.Domains,
CertsDir: endpoint.Address.CertsDir,
}
}
func (c *CdsManager) makeHealthChecks(checks []*xdspb.HealthCheck) (result []model.HealthCheckConfig) {
//todo implement me after fix model.HealthCheck type define
//result = make([]model.HealthCheck, 0, len(checks))
//for _, check := range checks {
// switch one := check.GetChecker().(type) {
// case *xdspb.HealthCheck_HttpChecker:
// result = append(result, model.HttpHealthCheck{
// Host: one.HttpChecker.Host,
// Path: one.HttpChecker.Path,
// UseHttp2: one.HttpChecker.UseHttp2,
// ExpectedStatuses: one.HttpChecker.ExpectedStatuses,
// })
// case *xdspb.HealthCheck_GrpcChecker:
// result = append(result, model.GrpcHealthCheck{
// ServiceName: one.GrpcChecker.ServiceName,
// Authority: one.GrpcChecker.Authority,
// })
// case *xdspb.HealthCheck_CustomChecker:
// result = append(result, model.CustomHealthCheck{
// Name: one.CustomChecker.Name,
// Config: func() interface{} {
// if one.CustomChecker.Config == nil {
// return nil
// }
// return one.CustomChecker.Config.AsMap()
// }(),
// })
// }
//}
return
}
func (c *CdsManager) makeEdsClusterConfig(edsConfig *xdspb.EdsClusterConfig) model.EdsClusterConfig {
if edsConfig == nil {
return model.EdsClusterConfig{}
}
return model.EdsClusterConfig{
EdsConfig: model.ConfigSource{
Path: edsConfig.EdsConfig.Path,
ApiConfigSource: c.makeApiConfigSource(edsConfig.EdsConfig.ApiConfigSource),
},
ServiceName: edsConfig.ServiceName,
}
}
func (c *CdsManager) makeApiConfigSource(apiConfig *xdspb.ApiConfigSource) (result model.ApiConfigSource) {
apiType, ok := model.ApiTypeValue[apiConfig.APITypeStr]
if !ok {
logger.Errorf("unknown apiType %s", apiConfig.APITypeStr)
return
}
return model.ApiConfigSource{
APIType: api.ApiType(apiType),
APITypeStr: apiConfig.APITypeStr,
ClusterName: apiConfig.ClusterName,
RefreshDelay: apiConfig.RefreshDelay,
RequestTimeout: apiConfig.RequestTimeout,
GrpcServices: nil, //todo create node of pb
}
}