blob: c92f73e2d65aa2a815d995b6e5ca681b7216d280 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adaptor
import (
"context"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/alarm"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
"k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"os"
"sync"
"time"
)
var (
client *K8sClient
clientOnce sync.Once
)
type K8sType string
type K8sClient struct {
// eventFuncs is used for store functions called k8s event handler
eventFuncs util.ConcurrentMap
// ipIndex is used for index pod object; key is ip, value is pod full name
ipIndex util.ConcurrentMap
kubeClient kubernetes.Interface
services ListWatcher
endpoints ListWatcher
nodes ListWatcher
pods ListWatcher
ready chan struct{}
stopCh chan struct{}
goroutine *gopool.Pool
}
func (c *K8sClient) init() (err error) {
c.ready = make(chan struct{})
c.stopCh = make(chan struct{})
c.goroutine = gopool.New(context.Background())
// if KUBERNETES_CONFIG_PATH is unset, then service center must be deployed in the same k8s cluster
c.kubeClient, err = createKubeClient(os.Getenv("KUBERNETES_CONFIG_PATH"))
if err != nil {
log.Errorf(err, "create kube client failed")
return
}
// if KUBERNETES_NAMESPACE is unset, then list watch all namespaces
listerFactory := informers.NewFilteredSharedInformerFactory(
c.kubeClient, defaultResyncInterval, os.Getenv("KUBERNETES_NAMESPACE"), nil)
c.services = c.newListWatcher(TypeService, listerFactory.Core().V1().Services().Informer())
c.endpoints = c.newListWatcher(TypeEndpoint, listerFactory.Core().V1().Endpoints().Informer())
c.nodes = c.newListWatcher(TypeNode, listerFactory.Core().V1().Nodes().Informer())
c.pods = c.newListWatcher(TypePod, listerFactory.Core().V1().Pods().Informer())
// append ipIndex build function
c.AppendEventFunc(TypePod, c.onPodEvent)
return
}
func (c *K8sClient) newListWatcher(t K8sType, lister cache.SharedIndexInformer) (lw ListWatcher) {
lw = NewListWatcher(t, lister, c.getEvent(t))
return
}
func (c *K8sClient) getEvent(t K8sType) OnEventFunc {
return func(evt K8sEvent) {
fs, ok := c.eventFuncs.Get(t)
if !ok {
return
}
for _, f := range fs.([]OnEventFunc) {
f(evt)
}
}
}
// onPodEvent is method to build ipIndex
func (c *K8sClient) onPodEvent(evt K8sEvent) {
pod, ok := evt.Object.(*v1.Pod)
if !ok {
deletedState, ok := evt.Object.(cache.DeletedFinalStateUnknown)
if !ok {
log.Warnf("event object is not a pod %#v", evt.Object)
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
log.Warnf("deletedState is not a pod %#v", evt.Object)
return
}
}
if len(pod.Status.PodIP) == 0 {
return
}
podName := getFullName(pod.Namespace, pod.Name)
switch evt.EventType {
case pb.EVT_CREATE, pb.EVT_UPDATE:
switch pod.Status.Phase {
case v1.PodPending, v1.PodRunning:
c.ipIndex.Put(pod.Status.PodIP, podName)
default:
}
case pb.EVT_DELETE:
c.ipIndex.Remove(pod.Status.PodIP)
}
}
// unsafe
func (c *K8sClient) AppendEventFunc(t K8sType, f OnEventFunc) {
itf, _ := c.eventFuncs.Fetch(t, func() (interface{}, error) {
return []OnEventFunc{}, nil
})
fs := itf.([]OnEventFunc)
fs = append(fs, f)
c.eventFuncs.Put(t, fs)
}
func (c *K8sClient) waitForSync(lw ListWatcher) ListWatcher {
<-c.ready
return lw
}
func (c *K8sClient) Services() ListWatcher {
return c.waitForSync(c.services)
}
func (c *K8sClient) Endpoints() ListWatcher {
return c.waitForSync(c.endpoints)
}
func (c *K8sClient) Pods() ListWatcher {
return c.waitForSync(c.pods)
}
func (c *K8sClient) Nodes() ListWatcher {
return c.waitForSync(c.nodes)
}
func (c *K8sClient) GetDomainProject() string {
return defaultDomainProject
}
func (c *K8sClient) GetService(namespace, name string) (svc *v1.Service) {
obj, ok, err := c.Services().GetStore().GetByKey(getFullName(namespace, name))
if err != nil {
log.Errorf(err, "get k8s service[%s/%s] failed", namespace, name)
return
}
if !ok {
return
}
svc = obj.(*v1.Service)
return
}
func (c *K8sClient) GetEndpoints(namespace, name string) (ep *v1.Endpoints) {
obj, ok, err := c.Endpoints().GetStore().GetByKey(getFullName(namespace, name))
if err != nil {
log.Errorf(err, "get k8s endpoints[%s/%s] failed", namespace, name)
return
}
if !ok {
return
}
ep = obj.(*v1.Endpoints)
return
}
func (c *K8sClient) GetPodByIP(ip string) (pod *v1.Pod) {
itf, ok := c.ipIndex.Get(ip)
if !ok {
return
}
key := itf.(string)
itf, ok, err := c.Pods().GetStore().GetByKey(key)
if err != nil {
log.Errorf(err, "get k8s pod[%s] by ip[%s] failed", key, ip)
}
if !ok {
return
}
pod = itf.(*v1.Pod)
return
}
func (c *K8sClient) GetNodeByPod(pod *v1.Pod) (node *v1.Node) {
itf, ok, err := c.Nodes().GetStore().GetByKey(pod.Spec.NodeName)
if err != nil {
log.Errorf(err, "get k8s node[%s] by pod[%s/%s] failed", pod.Spec.NodeName, pod.Namespace, pod.Name)
return
}
if !ok {
return
}
node = itf.(*v1.Node)
return
}
func (c *K8sClient) Run() {
if err := c.init(); err != nil {
alarm.Raise(alarm.IdBackendConnectionRefuse,
alarm.AdditionalContext("%v", err))
return
}
alarm.Clear(alarm.IdBackendConnectionRefuse)
c.goroutine.
Do(func(_ context.Context) { c.services.Run(c.stopCh) }).
Do(func(_ context.Context) { c.endpoints.Run(c.stopCh) }).
Do(func(_ context.Context) { c.pods.Run(c.stopCh) }).
Do(func(_ context.Context) { c.nodes.Run(c.stopCh) }).
Do(func(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(minWaitInterval):
util.SafeCloseChan(c.ready)
}
})
}
func (c *K8sClient) Stop() {
close(c.stopCh)
c.goroutine.Close(true)
log.Debug("kube client is stopped")
}
func (c *K8sClient) Ready() <-chan struct{} {
return c.ready
}
func Kubernetes() *K8sClient {
clientOnce.Do(func() {
client = &K8sClient{}
client.Run()
})
return client
}