| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package apiclient |
| |
| import ( |
| "context" |
| "os" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbo-go-pixiu/pixiu-api/pkg/xds" |
| xdspb "github.com/dubbo-go-pixiu/pixiu-api/pkg/xds/model" |
| clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" |
| envoyconfigcorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
| endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" |
| listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" |
| discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
| extensionpb "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" |
| "github.com/envoyproxy/go-control-plane/pkg/resource/v3" |
| "github.com/pkg/errors" |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/types/known/anypb" |
| "google.golang.org/protobuf/types/known/structpb" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pkg/logger" |
| "github.com/apache/dubbo-go-pixiu/pkg/model" |
| ) |
| |
| type GrpcApiClientOption func(*AggGrpcApiClient) |
| |
| func WithIstioService(serviceNames ...string) GrpcApiClientOption { |
| return func(g *AggGrpcApiClient) { |
| for _, name := range serviceNames { |
| g.dubboServiceFilter[name] = struct{}{} |
| } |
| } |
| } |
| |
| // CreateEnvoyGrpcApiClient create Grpc type ApiClient |
| func CreateEnvoyGrpcApiClient( |
| config *model.ApiConfigSource, |
| node *model.Node, |
| exitCh chan struct{}, |
| typeName ResourceTypeName, |
| opts ...GrpcApiClientOption, |
| ) *AggGrpcApiClient { |
| v := &AggGrpcApiClient{} |
| v.config = *config |
| v.node = node |
| v.grpcMg = grpcMg |
| v.exitCh = exitCh |
| switch typeName { |
| case xds.ListenerType: |
| v.typeUrl = resource.ListenerType |
| case xds.ClusterType: |
| v.typeUrl = resource.ClusterType |
| case xds.EndpointType: |
| v.typeUrl = resource.EndpointType |
| default: |
| logger.Warnf("typeName should be dubbo-go.pixiu/v1/discovery:cluster or dubbo-go.pixiu/v1/discovery:listener") |
| v.typeUrl = resource.ClusterType |
| } |
| |
| v.dubboServiceFilter = make(map[string]struct{}, 10) |
| for _, fn := range opts { |
| fn(v) |
| } |
| v.init() |
| return v |
| } |
| |
| type ( |
| AggGrpcApiClient struct { |
| GrpcExtensionApiClient |
| xDSAggClient discoverypb.AggregatedDiscoveryServiceClient |
| dubboServiceFilter map[string]struct{} // the service name to filter |
| } |
| discoveryResponseHandler func(any2 []*anypb.Any) |
| ) |
| |
| func (g *AggGrpcApiClient) init() { |
| if len(g.config.ClusterName) == 0 { |
| panic("should config one cluster at least") |
| } |
| //todo implement multiple grpc api services |
| if len(g.config.ClusterName) > 1 { |
| logger.Warn("defined multiple cluster for xDS api services but only one support.") |
| } |
| cluster, err := g.grpcMg.GetGrpcCluster(g.config.ClusterName[0]) |
| |
| if err != nil { |
| logger.Errorf("get cluster for init error. error=%v", err) |
| panic(err) |
| } |
| conn, err := cluster.GetConnection() |
| if err != nil { |
| panic(err) |
| } |
| g.xDSExtensionClient = extensionpb.NewExtensionConfigDiscoveryServiceClient(conn) |
| g.xDSAggClient = discoverypb.NewAggregatedDiscoveryServiceClient(conn) |
| |
| } |
| |
| func (g *AggGrpcApiClient) Fetch(_ string) ([]*ProtoAny, error) { |
| //un-support fetch |
| return nil, nil |
| } |
| |
| func (g *AggGrpcApiClient) Delta() (chan *DeltaResources, error) { |
| outputCh := make(chan *DeltaResources) |
| return outputCh, g.pipeline(outputCh) |
| } |
| |
| type refEndpoint struct { |
| IsPending bool |
| RawProto proto.Message |
| } |
| |
| func (g *AggGrpcApiClient) pipeline(output chan *DeltaResources) error { |
| // all endpoint refer cluster or listener. map[type]map[resource name]endpoint info |
| edsResources := make(map[resource.Type]map[string]refEndpoint) |
| req := g.makeDiscoveryRequest(g.resourceNames, g.typeUrl) |
| var handler discoveryResponseHandler |
| switch g.typeUrl { |
| case resource.ListenerType: |
| handler = func(any2 []*anypb.Any) { |
| for _, res := range any2 { |
| l := listenerpb.Listener{} |
| if err := res.UnmarshalTo(&l); err != nil { |
| logger.Warnf("can not decode source %s , %v", res.TypeUrl, res) |
| continue |
| } |
| } |
| } |
| case resource.ClusterType: |
| handler = func(any2 []*anypb.Any) { |
| // only one goroutine handle response, no need to lock for local var. |
| for _, res := range any2 { |
| logger.Infof("new resource found %s", res.TypeUrl) |
| c := clusterpb.Cluster{} |
| if err := res.UnmarshalTo(&c); err != nil { |
| logger.Warnf("can not decode source %s , %v", res.TypeUrl, res) |
| continue |
| } |
| //needn't lock edsResources |
| g.getClusterResourceReference(&c, edsResources) |
| } |
| |
| pendingResourceNames := make([]string, 0) |
| clusterRefEndpoints := edsResources[resource.ClusterType] |
| // list all pending pendingResourceNames |
| for name, b := range clusterRefEndpoints { |
| if b.IsPending { |
| pendingResourceNames = append(pendingResourceNames, name) |
| } |
| } |
| |
| // do not block, watch new resource at another goroutine |
| err := g.runEndpointReferences(pendingResourceNames, func(any2 []*anypb.Any) { |
| // run on another goroutine |
| extCluster := xdspb.PixiuExtensionClusters{ |
| Clusters: []*xdspb.Cluster{ |
| { |
| Name: "", |
| TypeStr: xds.ClusterType, |
| Type: 0, |
| EdsClusterConfig: nil, |
| LbStr: "", |
| Lb: 0, |
| HealthChecks: nil, |
| Endpoints: make([]*xdspb.Endpoint, 0, len(any2)), |
| }, |
| }, |
| } |
| |
| for _, one := range any2 { |
| l := endpointpb.ClusterLoadAssignment{} |
| if err := one.UnmarshalTo(&l); err != nil { |
| logger.Warnf("unmarshal error", err) |
| continue |
| } |
| c := clusterRefEndpoints[l.ClusterName].RawProto.(*clusterpb.Cluster) |
| extCluster.Clusters[0].Name = g.readServiceNameOfCluster(c) |
| |
| for _, ep := range l.Endpoints { |
| address := ep.LbEndpoints[0].GetEndpoint().GetAddress().GetSocketAddress() |
| extCluster.Clusters[0].Endpoints = append(extCluster.Clusters[0].Endpoints, &xdspb.Endpoint{ |
| Id: "", |
| Name: "", |
| Address: &xdspb.SocketAddress{ |
| Address: address.Address, |
| Port: int64(address.GetPortValue()), |
| }, |
| Metadata: nil, |
| }) |
| } |
| } |
| |
| //make output |
| output <- &DeltaResources{ |
| NewResources: []*ProtoAny{ |
| { |
| typeConfig: &envoyconfigcorev3.TypedExtensionConfig{ |
| Name: "cluster", //todo cluster name |
| TypedConfig: func() *anypb.Any { //make any.Any from extCluster |
| a, err := anypb.New(&extCluster) |
| if err != nil { |
| logger.Warnf("can not make anypb.Any %v", err) |
| return nil |
| } |
| return a |
| }(), |
| }, |
| }, |
| }, |
| RemovedResource: nil, |
| } |
| }) |
| if err != nil { //todo retry |
| logger.Errorf("can not run reference request %v", err) |
| } |
| } |
| |
| default: |
| return errors.Errorf("nedd listenerType of clusterType but get %s", g.typeUrl) |
| } |
| |
| if err := g.runDelta(req, handler); err != nil { |
| return errors.WithMessagef(err, "start run %s failed", req.TypeUrl) |
| } |
| |
| return nil |
| } |
| |
| // readServiceNameOfCluster get service name of k8s |
| func (g *AggGrpcApiClient) readServiceNameOfCluster(c *clusterpb.Cluster) string { |
| if c.Metadata == nil { |
| return "" |
| } |
| return c. |
| Metadata. |
| FilterMetadata["istio"]. |
| Fields["services"]. |
| GetListValue(). |
| GetValues()[0]. |
| GetStructValue(). |
| Fields["name"]. |
| GetStringValue() |
| } |
| |
| // request EDS for the allResourceNames |
| func (g *AggGrpcApiClient) runEndpointReferences(allResourceNames []string, |
| output discoveryResponseHandler) (err error) { |
| |
| //todo reload all request |
| req := g.makeDiscoveryRequest(allResourceNames, resource.EndpointType) |
| if err := g.runDelta(req, output); err != nil { |
| return errors.WithMessagef(err, "start run %s failed", req.TypeUrl) |
| } |
| return nil |
| } |
| |
| // runDelta start 2 goroutine to and watch change |
| func (g *AggGrpcApiClient) runDelta(req *discoverypb.DiscoveryRequest, output discoveryResponseHandler) error { |
| var delta discoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesClient |
| var cancel context.CancelFunc |
| var xState xdsState |
| //read resource list |
| backoff := func() { |
| xState = xdsState{} |
| for { |
| //back off |
| var err error |
| var ctx context.Context // context to sync exitCh |
| ctx, cancel = context.WithCancel(context.TODO()) |
| delta, err = g.sendInitAggRequest(ctx, req, &xState) |
| if err != nil { |
| logger.Error("can not receive delta discovery request, will back off 1 sec later", err) |
| select { |
| case <-time.After(1 * time.Second): |
| case <-g.exitCh: |
| logger.Infof("get close single.") |
| return |
| } |
| continue //backoff |
| } |
| return //success |
| } |
| } |
| |
| backoff() |
| if delta == nil { // delta instance not created because exitCh |
| return nil |
| } |
| go func() { |
| //waiting exitCh close |
| for range g.exitCh { |
| } |
| cancel() |
| }() |
| //get message |
| go func() { |
| for { // delta response backoff. |
| for { //loop consume receive data form xds server(sendInitDeltaRequest) |
| resp, err := delta.Recv() |
| if err != nil { |
| logger.Error("can not receive delta discovery request", err) |
| break |
| } |
| g.handleDeltaResponse(resp, &xState, output) |
| } |
| backoff() |
| } |
| }() |
| |
| return nil |
| } |
| |
| func (g *AggGrpcApiClient) handleDeltaResponse(resp *discoverypb.DiscoveryResponse, xdsState *xdsState, handler discoveryResponseHandler) { |
| // save the xds state |
| xdsState.deltaVersion = make(map[string]string, 1) |
| xdsState.nonce = resp.Nonce |
| xdsState.versionInfo = resp.VersionInfo |
| handler(resp.Resources) |
| //notify the resource change handler |
| //output <- resources |
| } |
| |
| func (g *AggGrpcApiClient) sendInitAggRequest(ctx context.Context, req *discoverypb.DiscoveryRequest, xState *xdsState) (stream discoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesClient, err error) { |
| req.VersionInfo = xState.versionInfo |
| req.ResponseNonce = xState.nonce |
| stream, err = g.xDSAggClient.StreamAggregatedResources(ctx) |
| if err != nil { |
| return nil, errors.Wrapf(err, "fetch dynamic resource from remote error. %s", g.resourceNames) |
| } |
| |
| err = stream.Send(req) |
| if err != nil { |
| return nil, errors.Wrapf(err, "fetch dynamic resource from remote error. %s", g.resourceNames) |
| } |
| return |
| } |
| |
| func (g *AggGrpcApiClient) makeDiscoveryRequest(resources []string, |
| typeUrl string, |
| ) *discoverypb.DiscoveryRequest { |
| return &discoverypb.DiscoveryRequest{ |
| //VersionInfo: xdsState.versionInfo, |
| Node: g.makeNode(), |
| ResourceNames: resources, //[]string{"outbound|20000||dubbo-go-app.default.svc.cluster.local"}, |
| TypeUrl: typeUrl, //"type.googleapis.com/envoy.config.listener.v3.Listener", |
| //ResponseNonce: xdsState.nonce, |
| ErrorDetail: nil, |
| } |
| } |
| |
| func (g *AggGrpcApiClient) makeNode() *envoyconfigcorev3.Node { |
| podId := os.Getenv("POD_IP") |
| if len(podId) == 0 { |
| logger.Warnf("expect POD_ID env") |
| podId = "0.0.0.0" |
| } |
| podName := os.Getenv("POD_NAME") |
| if len(podName) == 0 { |
| logger.Warnf("expect POD_NAME env") |
| podName = "pixiu-gateway" |
| } |
| nsName := os.Getenv("POD_NAMESPACE") |
| if len(nsName) == 0 { |
| logger.Warnf("expect POD_NAMESPACE env") |
| nsName = "default" |
| } |
| |
| return &envoyconfigcorev3.Node{ |
| Id: "sidecar~" + podId + "~" + podName + "." + nsName + ".svc.cluster.local", |
| UserAgentName: "pixiu", |
| Cluster: "testCluster", |
| UserAgentVersionType: &envoyconfigcorev3.Node_UserAgentVersion{UserAgentVersion: "1.45.0"}, |
| ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning"}, |
| Metadata: &structpb.Struct{ |
| Fields: map[string]*structpb.Value{ |
| "CLUSTER_ID": { |
| Kind: &structpb.Value_StringValue{StringValue: "Kubernetes"}, |
| }, |
| "LABELS": { |
| Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{ |
| Fields: map[string]*structpb.Value{}, |
| }}, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| // getClusterResourceReference get resources of cluster |
| func (g *AggGrpcApiClient) getClusterResourceReference(c *clusterpb.Cluster, edsResources map[resource.Type]map[string]refEndpoint) { |
| logger.Infof("cluster name ==>%s", c.Name) |
| if _, exist := g.dubboServiceFilter[g.readServiceNameOfCluster(c)]; !exist { |
| logger.Infof("cluster name ==>%v", c) |
| return |
| } |
| |
| switch typ := c.ClusterDiscoveryType.(type) { |
| case *clusterpb.Cluster_Type: |
| if typ.Type == clusterpb.Cluster_EDS { |
| name := c.Name |
| if c.EdsClusterConfig != nil && c.EdsClusterConfig.ServiceName != "" { |
| name = c.EdsClusterConfig.ServiceName |
| } |
| |
| if _, ok := edsResources[resource.ClusterType]; !ok { |
| edsResources[resource.ClusterType] = make(map[string]refEndpoint) |
| } |
| |
| edsResources[resource.ClusterType][name] = refEndpoint{ |
| IsPending: true, |
| RawProto: c, |
| } |
| } else { |
| logger.Infof("cluster type %s not supported", typ.Type.String()) |
| } |
| } |
| } |