blob: 2490530d1eb5fb83f7ab0328d70fea985d1138b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package k8s
import (
"context"
"strings"
"time"
)
import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
import (
"github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
k8s_common "github.com/apache/dubbo-kubernetes/pkg/plugins/common/k8s"
k8s_model "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/model"
k8s_registry "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/registry"
util_k8s "github.com/apache/dubbo-kubernetes/pkg/util/k8s"
)
func typeIsUnregistered(err error) bool {
var typeErr *k8s_registry.UnknownTypeError
return errors.As(err, &typeErr)
}
var _ store.ResourceStore = &KubernetesStore{}
type KubernetesStore struct {
Client kube_client.Client
Converter k8s_common.Converter
Scheme *kube_runtime.Scheme
}
func NewStore(client kube_client.Client, scheme *kube_runtime.Scheme, converter k8s_common.Converter) (store.ResourceStore, error) {
return &KubernetesStore{
Client: client,
Converter: converter,
Scheme: scheme,
}, nil
}
func (s *KubernetesStore) Create(ctx context.Context, r core_model.Resource, fs ...store.CreateOptionsFunc) error {
opts := store.NewCreateOptions(fs...)
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
if typeIsUnregistered(err) {
return errors.Errorf("cannot create instance of unregistered type %q", r.Descriptor().Name)
}
return errors.Wrap(err, "failed to convert core model into k8s counterpart")
}
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
obj.GetObjectMeta().SetLabels(opts.Labels)
obj.SetMesh(opts.Mesh)
obj.GetObjectMeta().SetName(name)
obj.GetObjectMeta().SetNamespace(namespace)
if opts.Owner != nil {
k8sOwner, err := s.Converter.ToKubernetesObject(opts.Owner)
if err != nil {
return errors.Wrap(err, "failed to convert core model into k8s counterpart")
}
if err := controllerutil.SetOwnerReference(k8sOwner, obj, s.Scheme); err != nil {
return errors.Wrap(err, "failed to set owner reference for object")
}
}
if err := s.Client.Create(ctx, obj); err != nil {
if kube_apierrs.IsAlreadyExists(err) {
// 如果资源已经存在了就直接返回空即可
logger.Sugar().Warn("资源已经存在了")
return nil
}
return errors.Wrap(err, "failed to create k8s resource")
}
err = s.Converter.ToCoreResource(obj, r)
if err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
return nil
}
func (s *KubernetesStore) Update(ctx context.Context, r core_model.Resource, fs ...store.UpdateOptionsFunc) error {
opts := store.NewUpdateOptions(fs...)
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
if typeIsUnregistered(err) {
return errors.Errorf("cannot update instance of unregistered type %q", r.Descriptor().Name)
}
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}
obj.GetObjectMeta().SetLabels(opts.Labels)
obj.SetMesh(r.GetMeta().GetMesh())
if err := s.Client.Update(ctx, obj); err != nil {
if kube_apierrs.IsConflict(err) {
return store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh())
}
return errors.Wrap(err, "failed to update k8s resource")
}
err = s.Converter.ToCoreResource(obj, r)
if err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
return nil
}
func (s *KubernetesStore) Delete(ctx context.Context, r core_model.Resource, fs ...store.DeleteOptionsFunc) error {
opts := store.NewDeleteOptions(fs...)
// get object and validate mesh
if err := s.Get(ctx, r, store.GetByKey(opts.Name, opts.Mesh)); err != nil {
return err
}
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
// Unregistered types can't exist in the first place, so deletion would automatically succeed.
if typeIsUnregistered(err) {
return nil
}
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
obj.GetObjectMeta().SetName(name)
obj.GetObjectMeta().SetNamespace(namespace)
if err := s.Client.Delete(ctx, obj); err != nil {
if kube_apierrs.IsNotFound(err) {
return nil
}
return errors.Wrap(err, "failed to delete k8s resource")
}
return nil
}
func (s *KubernetesStore) Get(ctx context.Context, r core_model.Resource, fs ...store.GetOptionsFunc) error {
opts := store.NewGetOptions(fs...)
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
if typeIsUnregistered(err) {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
if err := s.Client.Get(ctx, kube_client.ObjectKey{Namespace: namespace, Name: name}, obj); err != nil {
if kube_apierrs.IsNotFound(err) {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return errors.Wrap(err, "failed to get k8s resource")
}
if err := s.Converter.ToCoreResource(obj, r); err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
if opts.Version != "" && r.GetMeta().GetVersion() != opts.Version {
return store.ErrorResourceConflict(r.Descriptor().Name, opts.Name, opts.Mesh)
}
if r.GetMeta().GetMesh() != opts.Mesh {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return nil
}
func (s *KubernetesStore) List(ctx context.Context, rs core_model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
obj, err := s.Converter.ToKubernetesList(rs)
if err != nil {
if typeIsUnregistered(err) {
return nil
}
return errors.Wrapf(err, "failed to convert core list model of type %s into k8s counterpart", rs.GetItemType())
}
if err := s.Client.List(ctx, obj); err != nil {
return errors.Wrap(err, "failed to list k8s resources")
}
predicate := func(r core_model.Resource) bool {
if opts.Mesh != "" && r.GetMeta().GetMesh() != opts.Mesh {
return false
}
if opts.NameContains != "" && !strings.Contains(r.GetMeta().GetName(), opts.NameContains) {
return false
}
return true
}
fullList, err := registry.Global().NewList(rs.GetItemType())
if err != nil {
return err
}
if err := s.Converter.ToCoreList(obj, fullList, predicate); err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
for _, item := range fullList.GetItems() {
_ = rs.AddItem(item)
}
rs.GetPagination().SetTotal(uint32(len(fullList.GetItems())))
return nil
}
func k8sNameNamespace(coreName string, scope k8s_model.Scope) (string, string, error) {
if coreName == "" {
return "", "", store.PreconditionFormatError("name can't be empty")
}
switch scope {
case k8s_model.ScopeCluster:
return coreName, "", nil
case k8s_model.ScopeNamespace:
name, ns, err := util_k8s.CoreNameToK8sName(coreName)
if err != nil {
return "", "", store.PreconditionFormatError(err.Error())
}
return name, ns, nil
default:
return "", "", errors.Errorf("unknown scope %s", scope)
}
}
var _ core_model.ResourceMeta = &KubernetesMetaAdapter{}
type KubernetesMetaAdapter struct {
kube_meta.ObjectMeta
Mesh string
}
func (m *KubernetesMetaAdapter) GetName() string {
if m.Namespace == "" { // it's cluster scoped object
return m.ObjectMeta.Name
}
return util_k8s.K8sNamespacedNameToCoreName(m.ObjectMeta.Name, m.ObjectMeta.Namespace)
}
func (m *KubernetesMetaAdapter) GetNameExtensions() core_model.ResourceNameExtensions {
return k8s_common.ResourceNameExtensions(m.ObjectMeta.Namespace, m.ObjectMeta.Name)
}
func (m *KubernetesMetaAdapter) GetVersion() string {
return m.ObjectMeta.GetResourceVersion()
}
func (m *KubernetesMetaAdapter) GetMesh() string {
return m.Mesh
}
func (m *KubernetesMetaAdapter) GetCreationTime() time.Time {
return m.GetObjectMeta().GetCreationTimestamp().Time
}
func (m *KubernetesMetaAdapter) GetModificationTime() time.Time {
return m.GetObjectMeta().GetCreationTimestamp().Time
}
func (m *KubernetesMetaAdapter) GetLabels() map[string]string {
labels := maps.Clone(m.GetObjectMeta().GetLabels())
if labels == nil {
labels = map[string]string{}
}
if _, ok := labels[v1alpha1.DisplayName]; !ok {
labels[v1alpha1.DisplayName] = m.GetObjectMeta().GetName()
}
if m.Namespace != "" {
labels[v1alpha1.KubeNamespaceTag] = m.Namespace
}
return labels
}
type KubeFactory interface {
NewObject(r core_model.Resource) (k8s_model.KubernetesObject, error)
NewList(rl core_model.ResourceList) (k8s_model.KubernetesList, error)
}
var _ KubeFactory = &SimpleKubeFactory{}
type SimpleKubeFactory struct {
KubeTypes k8s_registry.TypeRegistry
}
func (f *SimpleKubeFactory) NewObject(r core_model.Resource) (k8s_model.KubernetesObject, error) {
return f.KubeTypes.NewObject(r.GetSpec())
}
func (f *SimpleKubeFactory) NewList(rl core_model.ResourceList) (k8s_model.KubernetesList, error) {
return f.KubeTypes.NewList(rl.NewItem().GetSpec())
}