blob: 91e483c2f92f09cbb84ed6ba1831cf1ced080013 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"encoding/json"
"fmt"
"strings"
)
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube"
"github.com/apache/dubbo-go-pixiu/pkg/config/constants"
"github.com/apache/dubbo-go-pixiu/pkg/config/host"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
)
func hasProxyIP(addresses []v1.EndpointAddress, proxyIP string) bool {
for _, addr := range addresses {
if addr.IP == proxyIP {
return true
}
}
return false
}
func getLabelValue(metadata metav1.Object, label string, fallBackLabel string) string {
metaLabels := metadata.GetLabels()
val := metaLabels[label]
if val != "" {
return val
}
return metaLabels[fallBackLabel]
}
// Forked from Kubernetes k8s.io/kubernetes/pkg/api/v1/pod
// FindPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no
// match is found, fail.
func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) {
portName := svcPort.TargetPort
switch portName.Type {
case intstr.String:
name := portName.StrVal
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == name && port.Protocol == svcPort.Protocol {
return int(port.ContainerPort), nil
}
}
}
case intstr.Int:
return portName.IntValue(), nil
}
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}
// findPortFromMetadata resolves the TargetPort of a Service Port, by reading the Pod spec.
func findPortFromMetadata(svcPort v1.ServicePort, podPorts []model.PodPort) (int, error) {
target := svcPort.TargetPort
switch target.Type {
case intstr.String:
name := target.StrVal
for _, port := range podPorts {
if port.Name == name && port.Protocol == string(svcPort.Protocol) {
return port.ContainerPort, nil
}
}
case intstr.Int:
// For a direct reference we can just return the port number
return target.IntValue(), nil
}
return 0, fmt.Errorf("no matching port found for %+v", svcPort)
}
type serviceTargetPort struct {
// the mapped port number, or 0 if unspecified
num int
// the mapped port name
name string
// a bool indicating if the mapped port name was explicitly set on the TargetPort field, or inferred from k8s' port.Name
explicitName bool
}
func findServiceTargetPort(servicePort *model.Port, k8sService *v1.Service) serviceTargetPort {
for _, p := range k8sService.Spec.Ports {
// TODO(@hzxuzhonghu): check protocol as well as port
if p.Name == servicePort.Name || p.Port == int32(servicePort.Port) {
if p.TargetPort.Type == intstr.Int && p.TargetPort.IntVal > 0 {
return serviceTargetPort{num: int(p.TargetPort.IntVal), name: p.Name, explicitName: false}
}
return serviceTargetPort{num: 0, name: p.TargetPort.StrVal, explicitName: true}
}
}
// should never happen
log.Debugf("did not find matching target port for %v on service %s", servicePort, k8sService.Name)
return serviceTargetPort{num: 0, name: "", explicitName: false}
}
func getPodServices(s listerv1.ServiceLister, pod *v1.Pod) ([]*v1.Service, error) {
allServices, err := s.Services(pod.Namespace).List(klabels.Everything())
if err != nil {
return nil, err
}
var services []*v1.Service
for _, service := range allServices {
if service.Spec.Selector == nil {
// services with nil selectors match nothing, not everything.
continue
}
selector := klabels.Set(service.Spec.Selector).AsSelectorPreValidated()
if selector.Matches(klabels.Set(pod.Labels)) {
services = append(services, service)
}
}
return services, nil
}
func portsEqual(a, b []v1.EndpointPort) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i].Name != b[i].Name || a[i].Port != b[i].Port || a[i].Protocol != b[i].Protocol ||
ptrValueOrEmpty(a[i].AppProtocol) != ptrValueOrEmpty(b[i].AppProtocol) {
return false
}
}
return true
}
func addressesEqual(a, b []v1.EndpointAddress) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i].IP != b[i].IP || a[i].Hostname != b[i].Hostname ||
ptrValueOrEmpty(a[i].NodeName) != ptrValueOrEmpty(b[i].NodeName) {
return false
}
}
return true
}
func ptrValueOrEmpty(ptr *string) string {
if ptr != nil {
return *ptr
}
return ""
}
func getNodeSelectorsForService(svc *v1.Service) labels.Instance {
if nodeSelector := svc.Annotations[kube.NodeSelectorAnnotation]; nodeSelector != "" {
var nodeSelectorKV map[string]string
if err := json.Unmarshal([]byte(nodeSelector), &nodeSelectorKV); err != nil {
log.Debugf("failed to unmarshal node selector annotation value for service %s.%s: %v",
svc.Name, svc.Namespace, err)
}
return nodeSelectorKV
}
return nil
}
func nodeEquals(a, b kubernetesNode) bool {
return a.address == b.address && a.labels.Equals(b.labels)
}
func isNodePortGatewayService(svc *v1.Service) bool {
if svc == nil {
return false
}
_, ok := svc.Annotations[kube.NodeSelectorAnnotation]
return ok && svc.Spec.Type == v1.ServiceTypeNodePort
}
// Get the pod key of the proxy which can be used to get pod from the informer cache
func podKeyByProxy(proxy *model.Proxy) string {
parts := strings.Split(proxy.ID, ".")
if len(parts) == 2 && proxy.Metadata.Namespace == parts[1] {
return kube.KeyFunc(parts[0], parts[1])
}
return ""
}
func convertToService(obj interface{}) (*v1.Service, error) {
cm, ok := obj.(*v1.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, fmt.Errorf("couldn't get object from tombstone %#v", obj)
}
cm, ok = tombstone.Obj.(*v1.Service)
if !ok {
return nil, fmt.Errorf("tombstone contained object that is not a Service %#v", obj)
}
}
return cm, nil
}
func namespacedNameForService(svc *model.Service) types.NamespacedName {
return types.NamespacedName{
Namespace: svc.Attributes.Namespace,
Name: svc.Attributes.Name,
}
}
// serviceClusterSetLocalHostname produces Kubernetes Multi-Cluster Services (MCS) ClusterSet FQDN for a k8s service
func serviceClusterSetLocalHostname(nn types.NamespacedName) host.Name {
return host.Name(nn.Name + "." + nn.Namespace + "." + "svc" + "." + constants.DefaultClusterSetLocalDomain)
}
// serviceClusterSetLocalHostnameForKR calls serviceClusterSetLocalHostname with the name and namespace of the given kubernetes resource.
func serviceClusterSetLocalHostnameForKR(obj metav1.Object) host.Name {
return serviceClusterSetLocalHostname(types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()})
}
func labelRequirement(key string, op selection.Operator, vals []string, opts ...field.PathOption) *klabels.Requirement {
out, err := klabels.NewRequirement(key, op, vals, opts...)
if err != nil {
panic(fmt.Sprintf("failed creating requirements for Service: %v", err))
}
return out
}