blob: 0c9ffd2b914e6ad584023725867a3aaa4b641224 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"encoding/base64"
"encoding/json"
"os"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"
DubboIOLabelKey = "dubbo.io/label"
DubboIOLabelValue = "dubbo.io-value"
)
var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
type Client struct {
// kubernetes connection config
cfg *rest.Config
// the kubernetes interface
rawClient kubernetes.Interface
// current pod config
currentPodName string
ns string
// current resource version
lastResourceVersion string
// the memory watcherSet
watcherSet WatcherSet
// protect the wg && currentPod
lock sync.RWMutex
// current pod status
currentPod *v1.Pod
// protect the watchPods loop && watcher
wg sync.WaitGroup
// manage the client lifecycle
ctx context.Context
cancel context.CancelFunc
}
// load CurrentPodName
func getCurrentPodName() (string, error) {
v := os.Getenv(podNameKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (HOSTNAME)")
}
return v, nil
}
// load CurrentNameSpace
func getCurrentNameSpace() (string, error) {
v := os.Getenv(nameSpaceKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (NAMESPACE)")
}
return v, nil
}
// NewMockClient
// export for registry package test
func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
return newMockClient(namespace, mockClientGenerator)
}
// newMockClient
// new a client for test
func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
rawClient, err := mockClientGenerator()
if err != nil {
return nil, perrors.WithMessage(err, "call mock generator")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
rawClient: rawClient,
ctx: ctx,
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
c.lastResourceVersion = c.currentPod.GetResourceVersion()
// start kubernetes watch loop
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
// newClient
// new a client for registry
func newClient(namespace string) (*Client, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, perrors.WithMessage(err, "get in-cluster config")
}
rawClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
// start kubernetes watch loop
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
func (c *Client) initCurrentPod() (*v1.Pod, error) {
// read the current pod status
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
if err != nil {
if err != ErrDubboLabelAlreadyExist {
return nil, perrors.WithMessage(err, "assemble dubbo label")
}
// current pod don't have label
}
p, err := c.getPatch(oldPod, newPod)
if err != nil {
return nil, perrors.WithMessage(err, "get patch")
}
currentPod, err = c.patchCurrentPod(p)
if err != nil {
return nil, perrors.WithMessage(err, "patch to current pod")
}
return currentPod, nil
}
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
func (c *Client) initWatchSet() error {
pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns)
}
// set resource version
c.lastResourceVersion = pods.GetResourceVersion()
for _, pod := range pods.Items {
logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
c.handleWatchedPodEvent(&pod, watch.Added)
}
return nil
}
// watchPods
// try to watch kubernetes pods
func (c *Client) watchPods() error {
// try once
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns)
}
watcher.Stop()
c.wg.Add(1)
// add wg, grace close the client
go c.watchPodsLoop()
return nil
}
type resourceVersionGetter interface {
GetResourceVersion() string
}
// watchPods
// try to notify
func (c *Client) watchPodsLoop() {
defer func() {
// notify other goroutine, this loop over
c.wg.Done()
logger.Info("watchPodsLoop goroutine game over")
}()
for {
onceWatch:
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err)
time.Sleep(2 * time.Second)
continue
}
logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion)
for {
select {
// double check ctx
case <-c.ctx.Done():
logger.Infof("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan()))
return
// get one element from result-chan
case event, ok := <-wc.ResultChan():
if !ok {
wc.Stop()
logger.Info("kubernetes watch chan die, create new")
goto onceWatch
}
if event.Type == watch.Error {
// watched a error event
logger.Warnf("kubernetes watch api report err (%#v)", event)
continue
}
o, ok := event.Object.(resourceVersionGetter)
if !ok {
logger.Warnf("kubernetes response object not a versioned object, its real type %T", event.Object)
continue
}
// record the last resource version avoid to sync all pod
c.lastResourceVersion = o.GetResourceVersion()
logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion)
// check event object type
p, ok := event.Object.(*v1.Pod)
if !ok {
logger.Warnf("kubernetes response object not a Pod, its real type %T", event.Object)
continue
}
logger.Debugf("kubernetes got pod %#v", p)
// handle the watched pod
go c.handleWatchedPodEvent(p, event.Type)
}
}
}
}
// handleWatchedPodEvent
// handle watched pod event
func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
for ak, av := range p.GetAnnotations() {
// not dubbo interest annotation
if ak != DubboIOAnnotationKey {
continue
}
ol, err := c.unmarshalRecord(av)
if err != nil {
logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
return
}
for _, o := range ol {
switch eventType {
case watch.Added:
// if pod is added, the record always be create
o.EventType = Create
case watch.Modified:
o.EventType = Update
case watch.Deleted:
o.EventType = Delete
default:
logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
return
}
logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
return
}
}
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
// []*WatcherEvent is nil.
return nil, nil
}
rawMsg, err := base64.URLEncoding.DecodeString(record)
if err != nil {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
var out []*WatcherEvent
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
return out, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
return "", perrors.WithMessage(err, "json encode object list")
}
return base64.URLEncoding.EncodeToString(msg), nil
}
// readCurrentPod
// read the current pod status from kubernetes api
func (c *Client) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
return currentPod, nil
}
// Create
// create k/v pair in watcher-set
func (c *Client) Create(k, v string) error {
// the read current pod must be lock, protect every
// create operation can be atomic
c.lock.Lock()
defer c.lock.Unlock()
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
// 2. get patch data
// 3. PATCH the pod
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessage(err, "read current pod")
}
oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
if err != nil {
return perrors.WithMessage(err, "assemble")
}
patchBytes, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
updatedPod, err := c.patchCurrentPod(patchBytes)
if err != nil {
return perrors.WithMessage(err, "patch current pod")
}
c.currentPod = updatedPod
logger.Debugf("put the @key = %s @value = %s success", k, v)
// not update the watcherSet, the watcherSet should be write by the watchPodsLoop
return nil
}
// patch current pod
// write new meta for current pod
func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch)
if err != nil {
return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
}
return updatedPod, nil
}
// assemble the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) {
var (
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
)
oldPod.Labels = make(map[string]string, 8)
newPod.Labels = make(map[string]string, 8)
if currentPod.GetLabels() != nil {
if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
// already have label
return nil, nil, ErrDubboLabelAlreadyExist
}
}
// copy current pod labels to oldPod && newPod
for k, v := range currentPod.GetLabels() {
oldPod.Labels[k] = v
newPod.Labels[k] = v
}
// assign new label for current pod
newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
return oldPod, newPod, nil
}
// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Annotations = make(map[string]string, 8)
newPod.Annotations = make(map[string]string, 8)
for k, v := range currentPod.GetAnnotations() {
oldPod.Annotations[k] = v
newPod.Annotations[k] = v
}
al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
if err != nil {
err = perrors.WithMessage(err, "unmarshal record")
return
}
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
}
newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
return
}
// getPatch
// get the kubernetes pod patch bytes
func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal old pod")
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal newPod pod")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, perrors.WithMessage(err, "create two-way-merge-patch")
}
return patchBytes, nil
}
// GetChildren
// get k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.watcherSet.Get(k, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
}
var kList []string
var vList []string
for _, o := range objectList {
kList = append(kList, o.Key)
vList = append(vList, o.Value)
}
return kList, vList, nil
}
// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.watcherSet.Watch(k, false)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
return w.ResultChan(), w.done(), nil
}
// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.watcherSet.Watch(prefix, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
return w.ResultChan(), w.done(), nil
}
// Valid
// Valid the client
// if return false, the client is die
func (c *Client) Valid() bool {
select {
case <-c.Done():
return false
default:
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.rawClient != nil
}
// Done
// read the client status
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
// Stop
// read the client status
func (c *Client) Close() {
select {
case <-c.ctx.Done():
//already stopped
return
default:
}
c.cancel()
// the client ctx be canceled
// will trigger the watcherSet watchers all stopped
// so, just wait
c.wg.Wait()
}
// ValidateClient
// validate the kubernetes client
func ValidateClient(container clientFacade) error {
client := container.Client()
// new Client
if client == nil || client.Valid() {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
}
container.SetClient(newClient)
}
return nil
}