blob: 2f6068e5b6e53fd6fc4587148901393e1d8ad3fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"context"
)
import (
"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/plugins/runtime/k8s/metadata"
)
var NodePortAddressPriority = []kube_core.NodeAddressType{
kube_core.NodeExternalIP,
kube_core.NodeInternalIP,
}
func (p *PodConverter) IngressFor(
ctx context.Context, zoneIngress *mesh_proto.ZoneIngress, pod *kube_core.Pod, services []*kube_core.Service,
) error {
if len(services) != 1 {
return errors.Errorf("ingress should be matched by exactly one service. Matched %d services", len(services))
}
ifaces, err := p.InboundConverter.InboundInterfacesFor(ctx, p.Zone, pod, services)
if err != nil {
return errors.Wrap(err, "could not generate inbound interfaces")
}
if len(ifaces) != 1 {
return errors.Errorf("generated %d inbound interfaces, expected 1. Interfaces: %v", len(ifaces), ifaces)
}
if zoneIngress.Networking == nil {
zoneIngress.Networking = &mesh_proto.ZoneIngress_Networking{}
}
zoneIngress.Networking.Address = pod.Status.PodIP
zoneIngress.Networking.Port = ifaces[0].Port
coords, err := p.coordinatesFromAnnotations(pod.Annotations)
if err != nil {
return err
}
if coords == nil { // if ingress public coordinates were not present in annotations we will try to pick it from service
coords, err = p.coordinatesFromService(ctx, services[0])
if err != nil {
return err
}
}
if coords != nil {
zoneIngress.Networking.AdvertisedAddress = coords.address
zoneIngress.Networking.AdvertisedPort = coords.port
}
adminPort, exist, err := metadata.Annotations(pod.Annotations).GetUint32(metadata.DubboEnvoyAdminPort)
if err != nil {
return err
}
if exist {
zoneIngress.Networking.Admin = &mesh_proto.EnvoyAdmin{Port: adminPort}
}
return nil
}
type coordinates struct {
address string
port uint32
}
func (p *PodConverter) coordinatesFromAnnotations(annotations metadata.Annotations) (*coordinates, error) {
publicAddress, addressExist := annotations.GetString(metadata.DubboIngressPublicAddressAnnotation)
publicPort, portExist, err := annotations.GetUint32(metadata.DubboIngressPublicPortAnnotation)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse annotation %s", metadata.DubboIngressPublicPortAnnotation)
}
if addressExist != portExist {
return nil, errors.Errorf("both %s and %s has to be defined", metadata.DubboIngressPublicAddressAnnotation, metadata.DubboIngressPublicPortAnnotation)
}
if addressExist && portExist {
return &coordinates{
address: publicAddress,
port: publicPort,
}, nil
}
return nil, nil
}
// coordinatesFromService is trying to generate ingress with public address and port using Service that selects the ingress
func (p *PodConverter) coordinatesFromService(ctx context.Context, service *kube_core.Service) (*coordinates, error) {
switch service.Spec.Type {
case kube_core.ServiceTypeLoadBalancer:
return p.coordinatesFromLoadBalancer(service)
case kube_core.ServiceTypeNodePort:
return p.coordinatesFromNodePort(ctx, service)
default:
converterLog.Info("ingress service type is not public, therefore the public coordinates of the ingress will not be automatically set. Change the ingress service to LoadBalancer or NodePort or override the settings using annotations.")
return nil, nil
}
}
func (p *PodConverter) coordinatesFromNodePort(ctx context.Context, service *kube_core.Service) (*coordinates, error) {
nodes := &kube_core.NodeList{}
if err := p.NodeGetter.List(ctx, nodes); err != nil {
return nil, err
}
if len(nodes.Items) < 1 { // this should not happen, K8S always has at least one node
return nil, errors.New("there are no nodes")
}
for _, addressType := range NodePortAddressPriority {
for _, address := range nodes.Items[0].Status.Addresses {
if address.Type == addressType {
coords := &coordinates{
address: address.Address,
port: uint32(service.Spec.Ports[0].NodePort),
}
return coords, nil
}
}
}
return nil, errors.New("could not find valid Node address for Ingress publicAddress")
}
func (p *PodConverter) coordinatesFromLoadBalancer(service *kube_core.Service) (*coordinates, error) {
if len(service.Status.LoadBalancer.Ingress) == 0 {
converterLog.V(1).Info("load balancer for ingress is not yet ready")
return nil, nil
}
publicAddress := ""
if service.Status.LoadBalancer.Ingress[0].Hostname != "" {
publicAddress = service.Status.LoadBalancer.Ingress[0].Hostname
}
if service.Status.LoadBalancer.Ingress[0].IP != "" {
publicAddress = service.Status.LoadBalancer.Ingress[0].IP
}
if publicAddress == "" {
converterLog.V(1).Info("load balancer for ingress is not yet ready. Hostname and IP are empty")
return nil, nil
}
coords := &coordinates{
address: publicAddress,
port: uint32(service.Spec.Ports[0].Port), // service has to have port, otherwise we would not generate inbound
}
return coords, nil
}