| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package controllers |
| |
| import ( |
| "context" |
| "strconv" |
| "strings" |
| ) |
| |
| import ( |
| "github.com/pkg/errors" |
| |
| kube_core "k8s.io/api/core/v1" |
| |
| kube_client "sigs.k8s.io/controller-runtime/pkg/client" |
| ) |
| |
| import ( |
| mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1" |
| core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh" |
| mesh_k8s "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/api/v1alpha1" |
| ) |
| |
| func (p *PodConverter) OutboundInterfacesFor( |
| ctx context.Context, |
| pod *kube_core.Pod, |
| others []*mesh_k8s.Dataplane, |
| reachableServices []string, |
| ) ([]*mesh_proto.Dataplane_Networking_Outbound, error) { |
| var outbounds []*mesh_proto.Dataplane_Networking_Outbound |
| |
| reachableServicesMap := map[string]bool{} |
| for _, service := range reachableServices { |
| reachableServicesMap[service] = true |
| } |
| |
| var dataplanes []*core_mesh.DataplaneResource |
| for _, other := range others { |
| dp := core_mesh.NewDataplaneResource() |
| if err := p.ResourceConverter.ToCoreResource(other, dp); err != nil { |
| converterLog.Error(err, "failed to parse Dataplane", "dataplane", other.Spec) |
| continue // one invalid Dataplane definition should not break the entire mesh |
| } |
| dataplanes = append(dataplanes, dp) |
| } |
| |
| endpoints := endpointsByService(dataplanes) |
| for _, serviceTag := range endpoints.Services() { |
| service, port, err := k8sService(ctx, serviceTag, p.ServiceGetter) |
| if err != nil { |
| converterLog.Error(err, "could not get K8S Service for service tag") |
| continue // one invalid Dataplane definition should not break the entire mesh |
| } |
| if len(reachableServices) > 0 && !reachableServicesMap[serviceTag] { |
| continue // ignore generating outbound if reachable services are defined and this one is not on the list |
| } |
| |
| // Do not generate outbounds for service-less |
| if isServiceLess(port) { |
| continue |
| } |
| |
| // Do not generate hostnames for ExternalName Service |
| if isExternalNameService(service) { |
| converterLog.V(1).Info( |
| "ignoring outbound generation for unsupported ExternalName Service", |
| "name", service.GetName(), |
| "namespace", service.GetNamespace(), |
| ) |
| continue |
| } |
| |
| if isHeadlessService(service) { |
| // Generate outbound listeners for every endpoint of services. |
| for _, endpoint := range endpoints[serviceTag] { |
| if endpoint.Address == pod.Status.PodIP { |
| continue // ignore generating outbound for itself, otherwise we've got a conflict with inbound |
| } |
| outbounds = append(outbounds, &mesh_proto.Dataplane_Networking_Outbound{ |
| Address: endpoint.Address, |
| Port: endpoint.Port, |
| Tags: map[string]string{ |
| mesh_proto.ServiceTag: serviceTag, |
| mesh_proto.InstanceTag: endpoint.Instance, |
| }, |
| }) |
| } |
| } else { |
| // generate outbound based on ClusterIP |
| outbounds = append(outbounds, &mesh_proto.Dataplane_Networking_Outbound{ |
| Address: service.Spec.ClusterIP, |
| Port: port, |
| Tags: map[string]string{ |
| mesh_proto.ServiceTag: serviceTag, |
| }, |
| }) |
| } |
| } |
| return outbounds, nil |
| } |
| |
| func isHeadlessService(svc *kube_core.Service) bool { |
| return svc.Spec.ClusterIP == kube_core.ClusterIPNone |
| } |
| |
| // Services of ExternalName type should not have any selectors. |
| // Kubernetes does not validate this, so in rare cases, a service of |
| // ExternalName type could point to a workload inside the mesh. If this |
| // happens, we will add the service to the VIPs config map, but we will |
| // not be able to obtain its IP address. As a result, the key in the map |
| // will be incorrect (e.g., "1:"). We do not currently support |
| // ExternalName services, so we can safely skip them from processing. |
| func isExternalNameService(svc *kube_core.Service) bool { |
| return svc != nil && svc.Spec.Type == kube_core.ServiceTypeExternalName |
| } |
| |
| func isServiceLess(port uint32) bool { |
| return port == mesh_proto.TCPPortReserved |
| } |
| |
| func k8sService(ctx context.Context, serviceTag string, client kube_client.Reader) (*kube_core.Service, uint32, error) { |
| name, ns, port, err := parseService(serviceTag) |
| if err != nil { |
| return nil, 0, errors.Wrapf(err, "failed to parse `service` host %q as FQDN", serviceTag) |
| } |
| if isServiceLess(port) { |
| return nil, port, err |
| } |
| |
| svc := &kube_core.Service{} |
| svcKey := kube_client.ObjectKey{Namespace: ns, Name: name} |
| if err := client.Get(ctx, svcKey, svc); err != nil { |
| return nil, 0, errors.Wrapf(err, "failed to get Service %q", svcKey) |
| } |
| return svc, port, nil |
| } |
| |
| func parseService(host string) (string, string, uint32, error) { |
| // split host into <name>_<namespace>_svc_<port> |
| segments := strings.Split(host, "_") |
| |
| var port uint32 |
| switch len(segments) { |
| case 4: |
| p, err := strconv.ParseInt(segments[3], 10, 32) |
| if err != nil { |
| return "", "", 0, err |
| } |
| port = uint32(p) |
| case 3: |
| // service less service names have no port, so we just put the reserved |
| // one here to note that this service is actually |
| port = mesh_proto.TCPPortReserved |
| default: |
| return "", "", 0, errors.Errorf("service tag in unexpected format") |
| } |
| |
| name, namespace := segments[0], segments[1] |
| return name, namespace, port, nil |
| } |