blob: 1d7aee61698db88b6e109ba4770c9e71fc31f58a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package util
import (
import (
kube_core ""
kube_meta ""
kube_labels ""
kube_types ""
kube_intstr ""
kube_client ""
import (
type ServicePredicate func(*kube_core.Service) bool
func MatchServiceThatSelectsPod(pod *kube_core.Pod, ignoredLabels []string) ServicePredicate {
return func(svc *kube_core.Service) bool {
selector := maps.Clone(svc.Spec.Selector)
for _, ignoredLabel := range ignoredLabels {
delete(selector, ignoredLabel)
return kube_labels.SelectorFromSet(selector).Matches(kube_labels.Set(pod.Labels))
// According to K8S docs about Service#selector:
// Route service traffic to pods with label keys and values matching this selector.
// If empty or not present, the service is assumed to have an external process managing its endpoints, which Kubernetes will not modify.
// Only applies to types ClusterIP, NodePort, and LoadBalancer. Ignored if type is ExternalName. More info:
// When converting Pod to Dataplane, we don't want to take into account Services that has no Selector, otherwise any Pod will match this service
// and since we just take any int target port in #util.FindPort every Dataplane in the same namespace as this service would get an extra inbound for it.
func AnySelector() ServicePredicate {
return func(svc *kube_core.Service) bool {
return len(svc.Spec.Selector) > 0
func Not(predicate ServicePredicate) ServicePredicate {
return func(svc *kube_core.Service) bool {
return !predicate(svc)
func FindServices(svcs *kube_core.ServiceList, predicates ...ServicePredicate) []*kube_core.Service {
matching := make([]*kube_core.Service, 0)
for i := range svcs.Items {
svc := &svcs.Items[i]
allMatched := true
for _, predicate := range predicates {
if !predicate(svc) {
allMatched = false
if allMatched {
matching = append(matching, svc)
// Sort by name so that inbound order is similar across zones regardless of the order services got created.
sort.Slice(matching, func(i, j int) bool {
return matching[i].Name < matching[j].Name
return matching
// FindPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no
// match is found, fail.
func FindPort(pod *kube_core.Pod, svcPort *kube_core.ServicePort) (int, *kube_core.Container, error) {
givenOrDefault := func(value kube_core.Protocol) kube_core.Protocol {
if value != "" {
return value
return kube_core.ProtocolTCP
portName := svcPort.TargetPort
switch portName.Type {
case kube_intstr.String:
name := portName.StrVal
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == name && givenOrDefault(port.Protocol) == givenOrDefault(svcPort.Protocol) {
return int(port.ContainerPort), &container, nil
case kube_intstr.Int:
// According to K8S docs about Container#ports:
// List of ports to expose from the container. Exposing a port here gives the system additional information about the network connections a container uses, but is primarily informational.
// Not specifying a port here DOES NOT prevent that port from being exposed. Any port which is listening on the default "" address inside a container will be accessible from the network
// Therefore we cannot match service port to the container port.
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort == portName.IntVal && givenOrDefault(port.Protocol) == givenOrDefault(svcPort.Protocol) {
return int(port.ContainerPort), &container, nil
return portName.IntValue(), nil, nil
return 0, nil, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
func FindContainerStatus(pod *kube_core.Pod, containerName string) *kube_core.ContainerStatus {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == containerName {
return &cs
return nil
func CopyStringMap(in map[string]string) map[string]string {
if in == nil {
return nil
out := make(map[string]string)
for key, value := range in {
out[key] = value
return out
// MeshOfByAnnotation returns the mesh of the given object according to its own annotations
// or those of its namespace.
func MeshOfByAnnotation(obj kube_meta.Object, namespace *kube_core.Namespace) string {
if mesh, exists := metadata.Annotations(obj.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
return mesh
if mesh, exists := metadata.Annotations(namespace.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
return mesh
return model.DefaultMesh
// MeshOfByLabelOrAnnotation returns the mesh of the given object according to its own
// annotations or labels or the annotations of its namespace. It treats the annotation
// directly on the object as deprecated.
func MeshOfByLabelOrAnnotation(log logr.Logger, obj kube_client.Object, namespace *kube_core.Namespace) string {
if mesh, exists := metadata.Annotations(obj.GetLabels()).GetString(metadata.DubboMeshLabel); exists && mesh != "" {
return mesh
if mesh, exists := metadata.Annotations(obj.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
log.Info("WARNING: The annotation is deprecated for this object kind", "name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
return mesh
if mesh, exists := metadata.Annotations(namespace.GetAnnotations()).GetString(metadata.DubboMeshAnnotation); exists && mesh != "" {
return mesh
return model.DefaultMesh
// ServiceTag returns the canonical service name for a Kubernetes service,
// optionally with a specific port.
func ServiceTag(name kube_types.NamespacedName, svcPort *int32) string {
port := ""
if svcPort != nil {
port = fmt.Sprintf("_%d", *svcPort)
return fmt.Sprintf("%s_%s_svc%s", name.Name, name.Namespace, port)