blob: 1d7aee61698db88b6e109ba4770c9e71fc31f58a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util
import (
"fmt"
"sort"
)
import (
"github.com/go-logr/logr"
"golang.org/x/exp/maps"
kube_core "k8s.io/api/core/v1"
kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_labels "k8s.io/apimachinery/pkg/labels"
kube_types "k8s.io/apimachinery/pkg/types"
kube_intstr "k8s.io/apimachinery/pkg/util/intstr"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/plugins/runtime/k8s/metadata"
)
type ServicePredicate func(*kube_core.Service) bool
func MatchServiceThatSelectsPod(pod *kube_core.Pod, ignoredLabels []string) ServicePredicate {
return func(svc *kube_core.Service) bool {
selector := maps.Clone(svc.Spec.Selector)
for _, ignoredLabel := range ignoredLabels {
delete(selector, ignoredLabel)
}
return kube_labels.SelectorFromSet(selector).Matches(kube_labels.Set(pod.Labels))
}
}
// According to K8S docs about Service#selector:
// Route service traffic to pods with label keys and values matching this selector.
// If empty or not present, the service is assumed to have an external process managing its endpoints, which Kubernetes will not modify.
// Only applies to types ClusterIP, NodePort, and LoadBalancer. Ignored if type is ExternalName. More info: https://kubernetes.io/docs/concepts/services-networking/service/
//
// When converting Pod to Dataplane, we don't want to take into account Services that has no Selector, otherwise any Pod will match this service
// and since we just take any int target port in #util.FindPort every Dataplane in the same namespace as this service would get an extra inbound for it.
func AnySelector() ServicePredicate {
return func(svc *kube_core.Service) bool {
return len(svc.Spec.Selector) > 0
}
}
func Not(predicate ServicePredicate) ServicePredicate {
return func(svc *kube_core.Service) bool {
return !predicate(svc)
}
}
func FindServices(svcs *kube_core.ServiceList, predicates ...ServicePredicate) []*kube_core.Service {
matching := make([]*kube_core.Service, 0)
for i := range svcs.Items {
svc := &svcs.Items[i]
allMatched := true
for _, predicate := range predicates {
if !predicate(svc) {
allMatched = false
break
}
}
if allMatched {
matching = append(matching, svc)
}
}
// Sort by name so that inbound order is similar across zones regardless of the order services got created.
sort.Slice(matching, func(i, j int) bool {
return matching[i].Name < matching[j].Name
})
return matching
}
// FindPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no
// match is found, fail.
func FindPort(pod *kube_core.Pod, svcPort *kube_core.ServicePort) (int, *kube_core.Container, error) {
givenOrDefault := func(value kube_core.Protocol) kube_core.Protocol {
if value != "" {
return value
}
return kube_core.ProtocolTCP
}
portName := svcPort.TargetPort
switch portName.Type {
case kube_intstr.String:
name := portName.StrVal
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == name && givenOrDefault(port.Protocol) == givenOrDefault(svcPort.Protocol) {
return int(port.ContainerPort), &container, nil
}
}
}
case kube_intstr.Int:
// According to K8S docs about Container#ports:
// List of ports to expose from the container. Exposing a port here gives the system additional information about the network connections a container uses, but is primarily informational.
// Not specifying a port here DOES NOT prevent that port from being exposed. Any port which is listening on the default "0.0.0.0" address inside a container will be accessible from the network
//
// Therefore we cannot match service port to the container port.
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort == portName.IntVal && givenOrDefault(port.Protocol) == givenOrDefault(svcPort.Protocol) {
return int(port.ContainerPort), &container, nil
}
}
}
return portName.IntValue(), nil, nil
}
return 0, nil, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}
func FindContainerStatus(pod *kube_core.Pod, containerName string) *kube_core.ContainerStatus {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == containerName {
return &cs
}
}
return nil
}
func CopyStringMap(in map[string]string) map[string]string {
if in == nil {
return nil
}
out := make(map[string]string)
for key, value := range in {
out[key] = value
}
return out
}
// MeshOfByAnnotation returns the mesh of the given object according to its own annotations
// or those of its namespace.
func MeshOfByAnnotation(obj kube_meta.Object, namespace *kube_core.Namespace) string {
if mesh, exists := metadata.Annotations(obj.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
return mesh
}
if mesh, exists := metadata.Annotations(namespace.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
return mesh
}
return model.DefaultMesh
}
// MeshOfByLabelOrAnnotation returns the mesh of the given object according to its own
// annotations or labels or the annotations of its namespace. It treats the annotation
// directly on the object as deprecated.
func MeshOfByLabelOrAnnotation(log logr.Logger, obj kube_client.Object, namespace *kube_core.Namespace) string {
if mesh, exists := metadata.Annotations(obj.GetLabels()).GetString(metadata.DubboMeshLabel); exists && mesh != "" {
return mesh
}
if mesh, exists := metadata.Annotations(obj.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
log.Info("WARNING: The dubbo.io/mesh annotation is deprecated for this object kind", "name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
return mesh
}
if mesh, exists := metadata.Annotations(namespace.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
return mesh
}
return model.DefaultMesh
}
// ServiceTag returns the canonical service name for a Kubernetes service,
// optionally with a specific port.
func ServiceTag(name kube_types.NamespacedName, svcPort *int32) string {
port := ""
if svcPort != nil {
port = fmt.Sprintf("_%d", *svcPort)
}
return fmt.Sprintf("%s_%s_svc%s", name.Name, name.Namespace, port)
}