/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package k8s

import (
	"context"
	"time"
)

import (
	"github.com/pkg/errors"

	kube_core "k8s.io/api/core/v1"

	kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
	kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
	kube_runtime "k8s.io/apimachinery/pkg/runtime"

	kube_client "sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

import (
	system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
	config_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
	core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
	core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
	common_k8s "github.com/apache/dubbo-kubernetes/pkg/plugins/common/k8s"
)

var _ core_store.ResourceStore = &KubernetesStore{}

const (
	configMapKey = "config"
)

type KubernetesStore struct {
	client kube_client.Client
	// Namespace to store ConfigMaps in, e.g. namespace where Control Plane is installed to
	namespace string
	converter common_k8s.Converter
	scheme    *kube_runtime.Scheme
}

func NewStore(client kube_client.Client, namespace string, scheme *kube_runtime.Scheme, converter common_k8s.Converter) (core_store.ResourceStore, error) {
	return &KubernetesStore{
		client:    client,
		namespace: namespace,
		converter: converter,
		scheme:    scheme,
	}, nil
}

func (s *KubernetesStore) Create(ctx context.Context, r core_model.Resource, fs ...core_store.CreateOptionsFunc) error {
	configRes, ok := r.(*config_model.ConfigResource)
	if !ok {
		return newInvalidTypeError()
	}
	opts := core_store.NewCreateOptions(fs...)
	cm := &kube_core.ConfigMap{
		TypeMeta: kube_meta.TypeMeta{
			Kind:       "ConfigMap",
			APIVersion: "v1",
		},
		ObjectMeta: kube_meta.ObjectMeta{
			Name:      opts.Name,
			Namespace: s.namespace,
		},
		Immutable: nil,
		Data: map[string]string{
			configMapKey: configRes.Spec.Config,
		},
	}
	if opts.Owner != nil {
		k8sOwner, err := s.converter.ToKubernetesObject(opts.Owner)
		if err != nil {
			return errors.Wrap(err, "failed to convert core model into k8s counterpart")
		}
		if err := controllerutil.SetOwnerReference(k8sOwner, cm, s.scheme); err != nil {
			return errors.Wrap(err, "failed to set owner reference for object")
		}
	}
	if err := s.client.Create(ctx, cm); err != nil {
		return err
	}
	r.SetMeta(&KubernetesMetaAdapter{cm.ObjectMeta})
	return nil
}

func (s *KubernetesStore) Update(ctx context.Context, r core_model.Resource, fs ...core_store.UpdateOptionsFunc) error {
	configRes, ok := r.(*config_model.ConfigResource)
	if !ok {
		return newInvalidTypeError()
	}
	cm := &kube_core.ConfigMap{
		TypeMeta: kube_meta.TypeMeta{
			Kind:       "ConfigMap",
			APIVersion: "v1",
		},
		ObjectMeta: r.GetMeta().(*KubernetesMetaAdapter).ObjectMeta,
		Immutable:  nil,
		Data: map[string]string{
			configMapKey: configRes.Spec.Config,
		},
	}
	if err := s.client.Update(ctx, cm); err != nil {
		if kube_apierrs.IsConflict(err) {
			return core_store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh())
		}
		return errors.Wrap(err, "failed to update k8s resource")
	}
	r.SetMeta(&KubernetesMetaAdapter{cm.ObjectMeta})
	return nil
}

func (s *KubernetesStore) Delete(ctx context.Context, r core_model.Resource, fs ...core_store.DeleteOptionsFunc) error {
	configRes, ok := r.(*config_model.ConfigResource)
	if !ok {
		return newInvalidTypeError()
	}
	opts := core_store.NewDeleteOptions(fs...)
	cm := &kube_core.ConfigMap{
		TypeMeta: kube_meta.TypeMeta{
			Kind:       "ConfigMap",
			APIVersion: "v1",
		},
		ObjectMeta: kube_meta.ObjectMeta{
			Name:      opts.Name,
			Namespace: s.namespace,
		},
		Immutable: nil,
		Data: map[string]string{
			configMapKey: configRes.Spec.Config,
		},
	}
	return s.client.Delete(ctx, cm)
}

func (s *KubernetesStore) Get(ctx context.Context, r core_model.Resource, fs ...core_store.GetOptionsFunc) error {
	configRes, ok := r.(*config_model.ConfigResource)
	if !ok {
		return newInvalidTypeError()
	}
	opts := core_store.NewGetOptions(fs...)
	cm := &kube_core.ConfigMap{}
	if err := s.client.Get(ctx, kube_client.ObjectKey{Namespace: s.namespace, Name: opts.Name}, cm); err != nil {
		if kube_apierrs.IsNotFound(err) {
			return core_store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
		}
		return errors.Wrap(err, "failed to get k8s Config")
	}
	configRes.Spec.Config = cm.Data[configMapKey]
	r.SetMeta(&KubernetesMetaAdapter{cm.ObjectMeta})
	return nil
}

func (s *KubernetesStore) List(ctx context.Context, rs core_model.ResourceList, fs ...core_store.ListOptionsFunc) error {
	configRes, ok := rs.(*config_model.ConfigResourceList)
	if !ok {
		return newInvalidTypeError()
	}
	cmlist := &kube_core.ConfigMapList{}

	if err := s.client.List(ctx, cmlist, kube_client.InNamespace(s.namespace)); err != nil {
		return errors.Wrap(err, "failed to list k8s internal config")
	}
	for _, cm := range cmlist.Items {
		configRes.Items = append(configRes.Items, &config_model.ConfigResource{
			Spec: &system_proto.Config{
				Config: cm.Data[configMapKey],
			},
			Meta: &KubernetesMetaAdapter{cm.ObjectMeta},
		})
	}
	return nil
}

var _ core_model.ResourceMeta = &KubernetesMetaAdapter{}

type KubernetesMetaAdapter struct {
	kube_meta.ObjectMeta
}

func (m *KubernetesMetaAdapter) GetNameExtensions() core_model.ResourceNameExtensions {
	return common_k8s.ResourceNameExtensions(m.ObjectMeta.Namespace, m.ObjectMeta.Name)
}

func (m *KubernetesMetaAdapter) GetVersion() string {
	return m.ObjectMeta.GetResourceVersion()
}

func (m *KubernetesMetaAdapter) GetMesh() string {
	return ""
}

func (m *KubernetesMetaAdapter) GetCreationTime() time.Time {
	return m.GetObjectMeta().GetCreationTimestamp().Time
}

func (m *KubernetesMetaAdapter) GetModificationTime() time.Time {
	return m.GetObjectMeta().GetCreationTimestamp().Time
}

func newInvalidTypeError() error {
	return errors.New("resource has a wrong type")
}
