blob: 27061a935097c8f9a7cda471c13c4bff59c3ce46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v1alpha1
import (
"context"
"fmt"
"strconv"
"time"
)
import (
"github.com/pkg/errors"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
dubbov1alpha1 "istio.io/api/dubbo/v1alpha1"
istioextensionsv1alpha1 "istio.io/api/extensions/v1alpha1"
"istio.io/client-go/pkg/apis/extensions/v1alpha1"
apierror "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
)
type ServiceMetadataServer struct {
dubbov1alpha1.UnimplementedServiceMetadataServiceServer
KubeClient kube.Client
// CommittedUpdates describes the number of configuration updates the discovery server has
// received, process, and stored in the push context. If this number is less than InboundUpdates,
// there are updates we have not yet processed.
// Note: This does not mean that all proxies have received these configurations; it is strictly
// the push context, which means that the next push to a proxy will receive this configuration.
CommittedUpdates *atomic.Int64
ch chan *pushRequest
debounceOptions debounceOptions
debounceHelper DebounceHelper
}
type pushRequest struct {
// application name + revision => metadata info
ConfigsUpdated map[model.ConfigKey]metadataConfig
}
type metadataConfig struct {
applicationName string
revision string
metadataInfo string
timestamp time.Time
}
func (pr *pushRequest) Merge(other *pushRequest) *pushRequest {
if pr == nil {
return other
}
if other == nil {
return pr
}
// Keep the first (older) start time
// Do not merge when any one is empty
if len(pr.ConfigsUpdated) == 0 || len(other.ConfigsUpdated) == 0 {
pr.ConfigsUpdated = nil
} else {
for key, value := range other.ConfigsUpdated {
if prValue, ok := pr.ConfigsUpdated[key]; ok {
if value.timestamp.After(prValue.timestamp) {
pr.ConfigsUpdated[key] = value
}
} else {
pr.ConfigsUpdated[key] = value
}
}
}
return pr
}
func NewServiceMetadataServer(client kube.Client) *ServiceMetadataServer {
return &ServiceMetadataServer{
CommittedUpdates: atomic.NewInt64(0),
KubeClient: client,
ch: make(chan *pushRequest, 10),
debounceOptions: debounceOptions{
debounceAfter: features.SMDebounceAfter,
debounceMax: features.SMDebounceMax,
enableDebounce: features.SMEnableDebounce,
},
}
}
func (s *ServiceMetadataServer) Start(stopCh <-chan struct{}) {
if s == nil {
log.Warn("ServiceMetadataServer is not init, skip start")
return
}
go s.handleUpdate(stopCh)
go s.removeOutdatedCRD(stopCh)
}
func (s *ServiceMetadataServer) Register(rpcs *grpc.Server) {
// Register v3 server
dubbov1alpha1.RegisterServiceMetadataServiceServer(rpcs, s)
}
func (s *ServiceMetadataServer) Publish(ctx context.Context, request *dubbov1alpha1.PublishServiceMetadataRequest) (*dubbov1alpha1.PublishServiceMetadataResponse, error) {
pushReq := &pushRequest{ConfigsUpdated: map[model.ConfigKey]metadataConfig{}}
key := model.ConfigKey{
Name: getConfigKeyName(request.GetApplicationName(), request.GetRevision()),
Namespace: request.GetNamespace(),
}
pushReq.ConfigsUpdated[key] = metadataConfig{
applicationName: request.GetApplicationName(),
revision: request.GetRevision(),
metadataInfo: request.GetMetadataInfo(),
timestamp: time.Now(),
}
s.ch <- pushReq
return &dubbov1alpha1.PublishServiceMetadataResponse{}, nil
}
func getConfigKeyName(applicationName, revision string) string {
log.Infof("application name: %s, revision: %s", applicationName, revision)
return fmt.Sprintf("%s-%s", applicationName, revision)
}
func (s *ServiceMetadataServer) Get(ctx context.Context, request *dubbov1alpha1.GetServiceMetadataRequest) (*dubbov1alpha1.GetServiceMetadataResponse, error) {
metadata, err := s.KubeClient.Istio().ExtensionsV1alpha1().ServiceMetadatas(request.Namespace).Get(ctx, getConfigKeyName(request.GetApplicationName(), request.GetRevision()), v1.GetOptions{})
if err != nil {
return nil, status.Errorf(codes.Aborted, err.Error())
}
return &dubbov1alpha1.GetServiceMetadataResponse{MetadataInfo: metadata.Spec.GetMetadataInfo()}, nil
}
func (s *ServiceMetadataServer) handleUpdate(stopCh <-chan struct{}) {
s.debounce(stopCh)
}
func (s *ServiceMetadataServer) Push(req *pushRequest) {
ctx := context.TODO()
for key, config := range req.ConfigsUpdated {
metadata, err := getOrCreateCRD(s.KubeClient, ctx, key.Namespace, config.applicationName, config.revision, config.metadataInfo, config.timestamp)
if err != nil {
log.Errorf("Failed to getOrCreateCRD, %s", err.Error())
return
}
metadata.Spec.MetadataInfo = config.metadataInfo
_, err = s.KubeClient.Istio().ExtensionsV1alpha1().ServiceMetadatas(metadata.Namespace).Update(ctx, metadata, v1.UpdateOptions{})
if err != nil {
log.Errorf("Failed to update metadata, %s", err.Error())
return
}
}
}
func getOrCreateCRD(kubeClient kube.Client, ctx context.Context, namespace string, applicationName, revision, metadataInfo string, createTime time.Time) (*v1alpha1.ServiceMetadata, error) {
var (
metadata *v1alpha1.ServiceMetadata
)
serviceMetadataInterface := kubeClient.Istio().ExtensionsV1alpha1().ServiceMetadatas(namespace)
crdName := getConfigKeyName(applicationName, revision)
metadata, err := serviceMetadataInterface.Get(ctx, crdName, v1.GetOptions{})
log.Infof("metadata: %+v", metadata)
if err != nil {
if apierror.IsNotFound(err) {
mt := &v1alpha1.ServiceMetadata{
ObjectMeta: v1.ObjectMeta{
Name: crdName,
Namespace: namespace,
Annotations: map[string]string{
"updateTime": strconv.FormatInt(createTime.Unix(), 10),
},
},
Spec: istioextensionsv1alpha1.ServiceMetadata{
ApplicationName: applicationName,
Revision: revision,
MetadataInfo: metadataInfo,
},
}
metadata, err = serviceMetadataInterface.Create(ctx, mt, v1.CreateOptions{})
if err != nil {
return nil, errors.Wrap(err, "Failed to create metadata")
}
} else {
return nil, errors.Wrap(err, "Failed to check metadata exists or not")
}
} else {
// if metadata exist in crd, then update timestamp
if metadata.ObjectMeta.Annotations != nil {
metadata.ObjectMeta.Annotations["updateTime"] = strconv.FormatInt(createTime.Unix(), 10)
} else {
metadata.ObjectMeta.Annotations = map[string]string{
"updateTime": strconv.FormatInt(createTime.Unix(), 10),
}
}
}
return metadata, nil
}
func (s *ServiceMetadataServer) debounce(stopCh <-chan struct{}) {
s.debounceHelper.Debounce(s.ch, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
}
func (s *ServiceMetadataServer) removeOutdatedCRD(stopChan <-chan struct{}) {
ctx, cancel := context.WithCancel(context.TODO())
ticker := time.NewTicker(2 * 60 * time.Hour)
for {
select {
case <-stopChan:
cancel()
ticker.Stop()
return
case <-ticker.C:
namespaces, err := s.KubeClient.Kube().CoreV1().Namespaces().List(ctx, v1.ListOptions{})
if err != nil {
log.Errorf("Failed to get namespaces, %s", err.Error())
continue
}
for _, namespace := range namespaces.Items {
metadataList, err := s.KubeClient.Istio().ExtensionsV1alpha1().ServiceMetadatas(namespace.GetNamespace()).List(ctx, v1.ListOptions{})
if err != nil {
log.Errorf("Failed to get metadata with namespace {%s}, %s", namespace, err.Error())
continue
}
for _, metadata := range metadataList.Items {
if metadata.Annotations != nil {
if ts, ok := metadata.Annotations["updateTime"]; ok {
tsInt, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
log.Errorf("Failed to get metadata with namespace {%s}, %s", namespace, err.Error())
} else {
if time.Now().Sub(time.Unix(tsInt, 0)) > 24*time.Hour {
err := s.KubeClient.Istio().ExtensionsV1alpha1().ServiceMetadatas(namespace.GetNamespace()).Delete(ctx, metadata.GetName(), v1.DeleteOptions{})
if err != nil {
log.Errorf("Failed to delete outdated metadata with namespace {%s}, metadata {%s}, err {%s}", namespace, metadata.GetName(), err.Error())
}
}
}
} else {
log.Errorf("Failed to get metadata with namespace {%s}, metadata {%s}", namespace, metadata.GetName())
}
}
}
}
}
}
}