blob: 9e642d80f5aaae5bbaff930c4375a045566b4340 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"fmt"
"strings"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/types/known/structpb"
)
import (
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
"github.com/apache/dubbo-kubernetes/pkg/dds"
"github.com/apache/dubbo-kubernetes/pkg/dds/util"
)
var _ DeltaDDSStream = &stream{}
type latestReceived struct {
nonce string
nameToVersion util.NameToVersion
}
type stream struct {
streamClient DDSSyncServiceStream
latestACKed map[core_model.ResourceType]string
latestReceived map[core_model.ResourceType]*latestReceived
clientId string
cpConfig string
runtimeInfo core_runtime.RuntimeInfo
}
type DDSSyncServiceStream interface {
Send(*envoy_sd.DeltaDiscoveryRequest) error
Recv() (*envoy_sd.DeltaDiscoveryResponse, error)
}
func NewDeltaDDSStream(s DDSSyncServiceStream, clientId string, runtimeInfo core_runtime.RuntimeInfo, cpConfig string) DeltaDDSStream {
return &stream{
streamClient: s,
runtimeInfo: runtimeInfo,
latestACKed: make(map[core_model.ResourceType]string),
latestReceived: make(map[core_model.ResourceType]*latestReceived),
clientId: clientId,
cpConfig: cpConfig,
}
}
func (s *stream) DeltaDiscoveryRequest(resourceType core_model.ResourceType) error {
req := &envoy_sd.DeltaDiscoveryRequest{
ResponseNonce: "",
Node: &envoy_core.Node{
Id: s.clientId,
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
dds.MetadataFieldConfig: {Kind: &structpb.Value_StringValue{StringValue: s.cpConfig}},
dds.MetadataControlPlaneId: {Kind: &structpb.Value_StringValue{StringValue: s.runtimeInfo.GetInstanceId()}},
dds.MetadataFeatures: {Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{
Values: []*structpb.Value{
{Kind: &structpb.Value_StringValue{StringValue: dds.FeatureZoneToken}},
{Kind: &structpb.Value_StringValue{StringValue: dds.FeatureHashSuffix}},
},
}}},
},
},
},
ResourceNamesSubscribe: []string{"*"},
TypeUrl: string(resourceType),
}
return s.streamClient.Send(req)
}
func (s *stream) Receive() (UpstreamResponse, error) {
resp, err := s.streamClient.Recv()
if err != nil {
return UpstreamResponse{}, err
}
rs, nameToVersion, err := util.ToDeltaCoreResourceList(resp)
if err != nil {
return UpstreamResponse{}, err
}
// when there isn't nonce it means it's the first request
isInitialRequest := true
if _, found := s.latestACKed[rs.GetItemType()]; found {
isInitialRequest = false
}
s.latestReceived[rs.GetItemType()] = &latestReceived{
nonce: resp.Nonce,
nameToVersion: nameToVersion,
}
return UpstreamResponse{
ControlPlaneId: resp.GetControlPlane().GetIdentifier(),
Type: rs.GetItemType(),
AddedResources: rs,
RemovedResourcesKey: s.mapRemovedResources(resp.RemovedResources),
IsInitialRequest: isInitialRequest,
}, nil
}
func (s *stream) ACK(resourceType core_model.ResourceType) error {
latestReceived := s.latestReceived[resourceType]
if latestReceived == nil {
return nil
}
err := s.streamClient.Send(&envoy_sd.DeltaDiscoveryRequest{
ResponseNonce: latestReceived.nonce,
Node: &envoy_core.Node{
Id: s.clientId,
},
TypeUrl: string(resourceType),
})
if err == nil {
s.latestACKed[resourceType] = latestReceived.nonce
}
return err
}
func (s *stream) NACK(resourceType core_model.ResourceType, err error) error {
latestReceived, found := s.latestReceived[resourceType]
if !found {
return nil
}
return s.streamClient.Send(&envoy_sd.DeltaDiscoveryRequest{
ResponseNonce: latestReceived.nonce,
ResourceNamesSubscribe: []string{"*"},
TypeUrl: string(resourceType),
Node: &envoy_core.Node{
Id: s.clientId,
},
ErrorDetail: &status.Status{
Message: fmt.Sprintf("%s", err),
},
})
}
// go-contro-plane cache keeps them as a <resource_name>.<mesh_name>
func (s *stream) mapRemovedResources(removedResourceNames []string) []core_model.ResourceKey {
removed := []core_model.ResourceKey{}
for _, resourceName := range removedResourceNames {
index := strings.LastIndex(resourceName, ".")
var rk core_model.ResourceKey
if index != -1 {
rk = core_model.WithMesh(resourceName[index+1:], resourceName[:index])
} else {
rk = core_model.WithoutMesh(resourceName)
}
removed = append(removed, rk)
}
return removed
}