blob: 5ce125a0143fffe16031b4148fb100c6de753dbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package model
import (
"encoding/json"
"fmt"
networkutil "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/network"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/maps"
pm "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/pkg/xds"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/protobuf/types/known/structpb"
meshconfig "istio.io/api/mesh/v1alpha1"
)
const (
serviceNodeSeparator = "~"
)
const (
IPv4 = pm.IPv4
IPv6 = pm.IPv6
Dual = pm.Dual
)
type XdsResourceGenerator interface {
Generate(proxy *Proxy, w *WatchedResource, req *PushRequest) (Resources, XdsLogDetails, error)
}
type XdsDeltaResourceGenerator interface {
XdsResourceGenerator
GenerateDeltas(proxy *Proxy, req *PushRequest, w *WatchedResource) (Resources, DeletedResources, XdsLogDetails, bool, error)
}
type (
Node = pm.Node
NodeMetadata = pm.NodeMetadata
BootstrapNodeMetadata = pm.BootstrapNodeMetadata
NodeType = pm.NodeType
IPMode = pm.IPMode
)
type Watcher = meshwatcher.WatcherCollection
type WatchedResource = xds.WatchedResource
type Environment struct {
ServiceDiscovery
Watcher
ConfigStore
mutex sync.RWMutex
pushContext *PushContext
NetworksWatcher mesh.NetworksWatcher
NetworkManager *NetworkManager
clusterLocalServices ClusterLocalProvider
DomainSuffix string
EndpointIndex *EndpointIndex
Cache XdsCache
}
func NewEnvironment() *Environment {
var cache XdsCache
if features.EnableXDSCaching {
cache = NewXdsCache()
} else {
cache = DisabledCache{}
}
return &Environment{
pushContext: NewPushContext(),
Cache: cache,
EndpointIndex: NewEndpointIndex(cache),
}
}
func NewEndpointIndex(cache XdsCache) *EndpointIndex {
return &EndpointIndex{
shardsBySvc: make(map[string]map[string]*EndpointShards),
cache: cache,
}
}
type XdsLogDetails struct {
Incremental bool
AdditionalInfo string
}
var DefaultXdsLogDetails = XdsLogDetails{}
type Resources = []*discovery.Resource
type DeletedResources = []string
var _ mesh.Holder = &Environment{}
func (e *Environment) PushContext() *PushContext {
e.mutex.RLock()
defer e.mutex.RUnlock()
return e.pushContext
}
func (e *Environment) SetPushContext(pc *PushContext) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.pushContext = pc
}
func (e *Environment) Mesh() *meshconfig.MeshConfig {
if e != nil && e.Watcher != nil {
return e.Watcher.Mesh()
}
return nil
}
func (e *Environment) MeshNetworks() *meshconfig.MeshNetworks {
if e != nil && e.NetworksWatcher != nil {
return e.NetworksWatcher.Networks()
}
return nil
}
func (e *Environment) AddMeshHandler(h func()) {
if e != nil && e.Watcher != nil {
e.Watcher.AddMeshHandler(h)
}
}
func (e *Environment) GetDiscoveryAddress() (host.Name, string, error) {
proxyConfig := mesh.DefaultProxyConfig()
if e.Mesh().DefaultConfig != nil {
proxyConfig = e.Mesh().DefaultConfig
}
hostname, port, err := net.SplitHostPort(proxyConfig.DiscoveryAddress)
if err != nil {
return "", "", fmt.Errorf("invalid Dubbod Address: %s, %v", proxyConfig.DiscoveryAddress, err)
}
if _, err := strconv.Atoi(port); err != nil {
return "", "", fmt.Errorf("invalid Dubbod Port: %s, %s, %v", port, proxyConfig.DiscoveryAddress, err)
}
return host.Name(hostname), port, nil
}
func (e *Environment) GetProxyConfigOrDefault(ns string, labels, annotations map[string]string, meshConfig *meshconfig.MeshConfig) *meshconfig.ProxyConfig {
return mesh.DefaultProxyConfig()
}
func (e *Environment) ClusterLocal() ClusterLocalProvider {
return e.clusterLocalServices
}
func (e *Environment) Init() {
// Use a default DomainSuffix, if none was provided.
if len(e.DomainSuffix) == 0 {
e.DomainSuffix = constants.DefaultClusterLocalDomain
}
e.clusterLocalServices = NewClusterLocalProvider(e)
}
func (e *Environment) InitNetworksManager(updater XDSUpdater) (err error) {
e.NetworkManager, err = NewNetworkManager(e, updater)
return
}
type Proxy struct {
sync.RWMutex
XdsResourceGenerator XdsResourceGenerator
LastPushContext *PushContext
LastPushTime time.Time
WatchedResources map[string]*WatchedResource
ID string
DNSDomain string
Metadata *NodeMetadata
IPAddresses []string
XdsNode *core.Node
ConfigNamespace string
ServiceTargets []ServiceTarget
ipMode IPMode
GlobalUnicastIP string
}
func (node *Proxy) GetWatchedResource(typeURL string) *WatchedResource {
node.RLock()
defer node.RUnlock()
return node.WatchedResources[typeURL]
}
func (node *Proxy) DeleteWatchedResource(typeURL string) {
node.Lock()
defer node.Unlock()
delete(node.WatchedResources, typeURL)
}
func (node *Proxy) NewWatchedResource(typeURL string, names []string) {
node.Lock()
defer node.Unlock()
node.WatchedResources[typeURL] = &WatchedResource{TypeUrl: typeURL, ResourceNames: sets.New(names...)}
}
func (node *Proxy) GetID() string {
if node == nil {
return ""
}
return node.ID
}
func (node *Proxy) DiscoverIPMode() {
node.ipMode = pm.DiscoverIPMode(node.IPAddresses)
node.GlobalUnicastIP = networkutil.GlobalUnicastIP(node.IPAddresses)
}
func (node *Proxy) UpdateWatchedResource(typeURL string, updateFn func(*WatchedResource) *WatchedResource) {
node.Lock()
defer node.Unlock()
r := node.WatchedResources[typeURL]
r = updateFn(r)
if r != nil {
node.WatchedResources[typeURL] = r
} else {
delete(node.WatchedResources, typeURL)
}
}
func (node *Proxy) IsProxylessGrpc() bool {
return node.Metadata != nil && node.Metadata.Generator == "grpc"
}
func (node *Proxy) ShouldUpdateServiceTargets(updates sets.Set[ConfigKey]) bool {
// we only care for services which can actually select this proxy
for config := range updates {
if config.Kind == kind.ServiceEntry || config.Namespace == node.Metadata.Namespace {
return true
}
}
return false
}
func (node *Proxy) SetServiceTargets(serviceDiscovery ServiceDiscovery) {
instances := serviceDiscovery.GetProxyServiceTargets(node)
// Keep service instances in order of creation/hostname.
sort.SliceStable(instances, func(i, j int) bool {
if instances[i].Service != nil && instances[j].Service != nil {
if !instances[i].Service.CreationTime.Equal(instances[j].Service.CreationTime) {
return instances[i].Service.CreationTime.Before(instances[j].Service.CreationTime)
}
// Additionally, sort by hostname just in case services created automatically at the same second.
return instances[i].Service.Hostname < instances[j].Service.Hostname
}
return true
})
node.ServiceTargets = instances
}
func (node *Proxy) ShallowCloneWatchedResources() map[string]*WatchedResource {
node.RLock()
defer node.RUnlock()
return maps.Clone(node.WatchedResources)
}
func ParseMetadata(metadata *structpb.Struct) (*NodeMetadata, error) {
if metadata == nil {
return &NodeMetadata{}, nil
}
bootstrapNodeMeta, err := ParseBootstrapNodeMetadata(metadata)
if err != nil {
return nil, err
}
return &bootstrapNodeMeta.NodeMetadata, nil
}
func ParseBootstrapNodeMetadata(metadata *structpb.Struct) (*BootstrapNodeMetadata, error) {
if metadata == nil {
return &BootstrapNodeMetadata{}, nil
}
b, err := protomarshal.MarshalProtoNames(metadata)
if err != nil {
return nil, fmt.Errorf("failed to read node metadata %v: %v", metadata, err)
}
meta := &BootstrapNodeMetadata{}
if err := json.Unmarshal(b, meta); err != nil {
return nil, fmt.Errorf("failed to unmarshal node metadata (%v): %v", string(b), err)
}
return meta, nil
}
func ParseServiceNodeWithMetadata(nodeID string, metadata *NodeMetadata) (*Proxy, error) {
parts := strings.Split(nodeID, serviceNodeSeparator)
out := &Proxy{
Metadata: metadata,
}
if len(parts) != 4 {
return out, fmt.Errorf("missing parts in the service node %q (expected 4 parts, got %d)", nodeID, len(parts))
}
// Extract IP address from parts[1] (format: type~ip~id~domain)
// Validate and set IP address
if len(parts[1]) > 0 {
ip := net.ParseIP(parts[1])
if ip != nil {
out.IPAddresses = []string{parts[1]}
}
}
// If IP address is empty in node ID, we still need to validate it
// For proxyless gRPC, IP address should be set, but we'll allow it to be set later
// if it's empty, we'll set it from pod IP when ServiceTargets are computed
if len(out.IPAddresses) == 0 {
// IP address will be set later when ServiceTargets are computed from pod IP
// For now, we allow empty IP for proxyless nodes to avoid failing initialization
// The IP will be populated when GetProxyServiceTargets is called
out.IPAddresses = []string{}
}
out.ID = parts[2]
out.DNSDomain = parts[3]
// Validate that ID is not empty - this is critical for proxyless gRPC
if len(out.ID) == 0 {
return out, fmt.Errorf("node ID is empty in service node %q (parts[2] is empty)", nodeID)
}
return out, nil
}
func GetProxyConfigNamespace(proxy *Proxy) string {
if proxy == nil {
return ""
}
// First look for DUBBO_META_CONFIG_NAMESPACE
// All newer proxies (from Istio 1.1 onwards) are supposed to supply this
if len(proxy.Metadata.Namespace) > 0 {
return proxy.Metadata.Namespace
}
// if not found, for backward compatibility, extract the namespace from
// the proxy domain. this is a k8s specific hack and should be enabled
parts := strings.Split(proxy.DNSDomain, ".")
if len(parts) > 1 { // k8s will have namespace.<domain>
return parts[0]
}
return ""
}