| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package k8s |
| |
| import ( |
| "context" |
| "time" |
| ) |
| |
| import ( |
| "github.com/pkg/errors" |
| |
| kube_core "k8s.io/api/core/v1" |
| |
| kube_apierrs "k8s.io/apimachinery/pkg/api/errors" |
| kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1" |
| kube_runtime "k8s.io/apimachinery/pkg/runtime" |
| |
| kube_client "sigs.k8s.io/controller-runtime/pkg/client" |
| "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" |
| ) |
| |
| import ( |
| system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1" |
| config_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system" |
| core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model" |
| core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store" |
| common_k8s "github.com/apache/dubbo-kubernetes/pkg/plugins/common/k8s" |
| ) |
| |
| var _ core_store.ResourceStore = &KubernetesStore{} |
| |
| const ( |
| configMapKey = "config" |
| ) |
| |
| type KubernetesStore struct { |
| client kube_client.Client |
| // Namespace to store ConfigMaps in, e.g. namespace where Control Plane is installed to |
| namespace string |
| converter common_k8s.Converter |
| scheme *kube_runtime.Scheme |
| } |
| |
| func NewStore(client kube_client.Client, namespace string, scheme *kube_runtime.Scheme, converter common_k8s.Converter) (core_store.ResourceStore, error) { |
| return &KubernetesStore{ |
| client: client, |
| namespace: namespace, |
| converter: converter, |
| scheme: scheme, |
| }, nil |
| } |
| |
| func (s *KubernetesStore) Create(ctx context.Context, r core_model.Resource, fs ...core_store.CreateOptionsFunc) error { |
| configRes, ok := r.(*config_model.ConfigResource) |
| if !ok { |
| return newInvalidTypeError() |
| } |
| opts := core_store.NewCreateOptions(fs...) |
| cm := &kube_core.ConfigMap{ |
| TypeMeta: kube_meta.TypeMeta{ |
| Kind: "ConfigMap", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: kube_meta.ObjectMeta{ |
| Name: opts.Name, |
| Namespace: s.namespace, |
| }, |
| Immutable: nil, |
| Data: map[string]string{ |
| configMapKey: configRes.Spec.Config, |
| }, |
| } |
| if opts.Owner != nil { |
| k8sOwner, err := s.converter.ToKubernetesObject(opts.Owner) |
| if err != nil { |
| return errors.Wrap(err, "failed to convert core model into k8s counterpart") |
| } |
| if err := controllerutil.SetOwnerReference(k8sOwner, cm, s.scheme); err != nil { |
| return errors.Wrap(err, "failed to set owner reference for object") |
| } |
| } |
| if err := s.client.Create(ctx, cm); err != nil { |
| return err |
| } |
| r.SetMeta(&KubernetesMetaAdapter{cm.ObjectMeta}) |
| return nil |
| } |
| |
| func (s *KubernetesStore) Update(ctx context.Context, r core_model.Resource, fs ...core_store.UpdateOptionsFunc) error { |
| configRes, ok := r.(*config_model.ConfigResource) |
| if !ok { |
| return newInvalidTypeError() |
| } |
| cm := &kube_core.ConfigMap{ |
| TypeMeta: kube_meta.TypeMeta{ |
| Kind: "ConfigMap", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: r.GetMeta().(*KubernetesMetaAdapter).ObjectMeta, |
| Immutable: nil, |
| Data: map[string]string{ |
| configMapKey: configRes.Spec.Config, |
| }, |
| } |
| if err := s.client.Update(ctx, cm); err != nil { |
| if kube_apierrs.IsConflict(err) { |
| return core_store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh()) |
| } |
| return errors.Wrap(err, "failed to update k8s resource") |
| } |
| r.SetMeta(&KubernetesMetaAdapter{cm.ObjectMeta}) |
| return nil |
| } |
| |
| func (s *KubernetesStore) Delete(ctx context.Context, r core_model.Resource, fs ...core_store.DeleteOptionsFunc) error { |
| configRes, ok := r.(*config_model.ConfigResource) |
| if !ok { |
| return newInvalidTypeError() |
| } |
| opts := core_store.NewDeleteOptions(fs...) |
| cm := &kube_core.ConfigMap{ |
| TypeMeta: kube_meta.TypeMeta{ |
| Kind: "ConfigMap", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: kube_meta.ObjectMeta{ |
| Name: opts.Name, |
| Namespace: s.namespace, |
| }, |
| Immutable: nil, |
| Data: map[string]string{ |
| configMapKey: configRes.Spec.Config, |
| }, |
| } |
| return s.client.Delete(ctx, cm) |
| } |
| |
| func (s *KubernetesStore) Get(ctx context.Context, r core_model.Resource, fs ...core_store.GetOptionsFunc) error { |
| configRes, ok := r.(*config_model.ConfigResource) |
| if !ok { |
| return newInvalidTypeError() |
| } |
| opts := core_store.NewGetOptions(fs...) |
| cm := &kube_core.ConfigMap{} |
| if err := s.client.Get(ctx, kube_client.ObjectKey{Namespace: s.namespace, Name: opts.Name}, cm); err != nil { |
| if kube_apierrs.IsNotFound(err) { |
| return core_store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh) |
| } |
| return errors.Wrap(err, "failed to get k8s Config") |
| } |
| configRes.Spec.Config = cm.Data[configMapKey] |
| r.SetMeta(&KubernetesMetaAdapter{cm.ObjectMeta}) |
| return nil |
| } |
| |
| func (s *KubernetesStore) List(ctx context.Context, rs core_model.ResourceList, fs ...core_store.ListOptionsFunc) error { |
| configRes, ok := rs.(*config_model.ConfigResourceList) |
| if !ok { |
| return newInvalidTypeError() |
| } |
| cmlist := &kube_core.ConfigMapList{} |
| |
| if err := s.client.List(ctx, cmlist, kube_client.InNamespace(s.namespace)); err != nil { |
| return errors.Wrap(err, "failed to list k8s internal config") |
| } |
| for _, cm := range cmlist.Items { |
| configRes.Items = append(configRes.Items, &config_model.ConfigResource{ |
| Spec: &system_proto.Config{ |
| Config: cm.Data[configMapKey], |
| }, |
| Meta: &KubernetesMetaAdapter{cm.ObjectMeta}, |
| }) |
| } |
| return nil |
| } |
| |
| var _ core_model.ResourceMeta = &KubernetesMetaAdapter{} |
| |
| type KubernetesMetaAdapter struct { |
| kube_meta.ObjectMeta |
| } |
| |
| func (m *KubernetesMetaAdapter) GetNameExtensions() core_model.ResourceNameExtensions { |
| return common_k8s.ResourceNameExtensions(m.ObjectMeta.Namespace, m.ObjectMeta.Name) |
| } |
| |
| func (m *KubernetesMetaAdapter) GetVersion() string { |
| return m.ObjectMeta.GetResourceVersion() |
| } |
| |
| func (m *KubernetesMetaAdapter) GetMesh() string { |
| return "" |
| } |
| |
| func (m *KubernetesMetaAdapter) GetCreationTime() time.Time { |
| return m.GetObjectMeta().GetCreationTimestamp().Time |
| } |
| |
| func (m *KubernetesMetaAdapter) GetModificationTime() time.Time { |
| return m.GetObjectMeta().GetCreationTimestamp().Time |
| } |
| |
| func newInvalidTypeError() error { |
| return errors.New("resource has a wrong type") |
| } |