// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package controllers

import (
	"bytes"
	"context"
	"crypto/rand"
	"crypto/rsa"
	"crypto/x509"
	"crypto/x509/pkix"
	"encoding/asn1"
	"encoding/pem"
	"fmt"
	"time"

	"github.com/go-logr/logr"
	apps "k8s.io/api/apps/v1"
	certv1beta1 "k8s.io/api/certificates/v1beta1"
	core "k8s.io/api/core/v1"
	apiequal "k8s.io/apimachinery/pkg/api/equality"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	kubeclient "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"software.sslmate.com/src/go-pkcs12"

	operatorv1alpha1 "github.com/apache/skywalking-swck/apis/operator/v1alpha1"
	"github.com/apache/skywalking-swck/pkg/kubernetes"
)

// StorageReconciler reconciles a Storage object
type StorageReconciler struct {
	client.Client
	Log        logr.Logger
	Scheme     *runtime.Scheme
	FileRepo   kubernetes.Repo
	Recorder   record.EventRecorder
	RestConfig *rest.Config
}

// +kubebuilder:rbac:groups=operator.skywalking.apache.org,resources=storages,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.skywalking.apache.org,resources=storages/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulset,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=services;serviceaccounts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles;clusterrolebindings,verbs=*

func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := r.Log.WithValues("Storage", req.NamespacedName)
	log.Info("=====================reconcile started================================")

	storage := operatorv1alpha1.Storage{}
	if err := r.Client.Get(ctx, req.NamespacedName, &storage); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	if storage.Spec.ConnectType == "external" {
		return ctrl.Result{RequeueAfter: schedDuration}, nil
	}

	r.createCert(ctx, log, &storage)
	r.checkSecurity(ctx, log, &storage)

	ff, err := r.FileRepo.GetFilesRecursive(storage.Spec.Type + "/templates")
	if err != nil {
		log.Error(err, "failed to load resource templates")
		return ctrl.Result{}, err
	}
	app := kubernetes.Application{
		Client:   r.Client,
		FileRepo: r.FileRepo,
		CR:       &storage,
		GVK:      operatorv1alpha1.GroupVersion.WithKind("Storage"),
		Recorder: r.Recorder,
	}
	if err := app.ApplyAll(ctx, ff, log); err != nil {
		return ctrl.Result{}, err
	}
	if err := r.checkState(ctx, log, &storage); err != nil {
		log.Error(err, "failed to check sub resources state")
		return ctrl.Result{}, err
	}

	return ctrl.Result{RequeueAfter: schedDuration}, nil
}

func (r *StorageReconciler) checkState(ctx context.Context, log logr.Logger, storage *operatorv1alpha1.Storage) error {
	overlay := operatorv1alpha1.StorageStatus{}
	statefulset := apps.StatefulSet{}
	errCol := new(kubernetes.ErrorCollector)
	object := client.ObjectKey{Namespace: storage.Namespace, Name: storage.Name + "-" + storage.Spec.Type}
	if err := r.Client.Get(ctx, object, &statefulset); err != nil && !apierrors.IsNotFound(err) {
		errCol.Collect(fmt.Errorf("failed to get statefulset: %w", err))
	} else {
		overlay.Conditions = statefulset.Status.Conditions
	}

	if apiequal.Semantic.DeepDerivative(overlay, storage.Status) {
		log.Info("Status keeps the same as before")
	}
	storage.Status = overlay
	storage.Kind = "Storage"
	if err := kubernetes.ApplyOverlay(storage, &operatorv1alpha1.Storage{Status: overlay}); err != nil {
		errCol.Collect(fmt.Errorf("failed to apply overlay: %w", err))
		return errCol.Error()
	}
	if err := r.Status().Update(ctx, storage); err != nil {
		errCol.Collect(fmt.Errorf("failed to update status of es: %w", err))
	}
	log.Info("updated Status sub resource")
	return errCol.Error()
}

func (r *StorageReconciler) checkSecurity(ctx context.Context, log logr.Logger, s *operatorv1alpha1.Storage) {
	user, tls := s.Spec.Security.User, s.Spec.Security.TLS
	if user.SecretName != "" {
		if user.SecretName == "default" {
			s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "ELASTIC_USER", Value: "elastic"})
			s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "ELASTIC_PASSWORD", Value: "changeme"})
		} else {
			usersecret := core.Secret{}
			if err := r.Client.Get(ctx, client.ObjectKey{Namespace: s.Namespace, Name: user.SecretName}, &usersecret); err != nil && !apierrors.IsNotFound(err) {
				log.Info("fail get usersecret ")
			}
			for k, v := range usersecret.Data {
				if k == "username" {
					s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "ELASTIC_USER", Value: string(v)})
				} else if k == "password" {
					s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "ELASTIC_PASSWORD", Value: string(v)})
				}
			}
		}
	}
	if tls {
		s.Spec.ServiceName = "skywalking-storage"
	} else {
		s.Spec.ServiceName = s.Name + "-" + s.Spec.Type
	}
	if s.Spec.ResourceCnfig.Limit == "" && s.Spec.ResourceCnfig.Requests == "" {
		s.Spec.ResourceCnfig.Limit, s.Spec.ResourceCnfig.Requests = "1000m", "100m"
	}
	clusterInitialMasterNodes := s.Name + "-elasticsearch7-0" + "," + s.Name + "-elasticsearch7-1"
	esJavaOptsValue := "-Xms512m -Xmx512m"
	s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "discovery.seed_hosts", Value: s.Spec.ServiceName})
	s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "cluster.initial_master_nodes", Value: clusterInitialMasterNodes})
	s.Spec.Config = append(s.Spec.Config, core.EnvVar{Name: "ES_JAVA_OPTS", Value: esJavaOptsValue})
}

func (r *StorageReconciler) createCert(ctx context.Context, log logr.Logger, s *operatorv1alpha1.Storage) {
	clientset, err := kubeclient.NewForConfig(r.RestConfig)
	if err != nil {
		return
	}
	existSecret, err := clientset.CoreV1().Secrets(s.Namespace).Get(ctx, "skywalking-storage", metav1.GetOptions{})
	if err != nil && !apierrors.IsNotFound(err) {
		log.Info("fail get skywalking-storage secret")
		return
	}
	if existSecret.Name != "" {
		_, verifyCert, decodeErr := pkcs12.Decode(existSecret.Data["storage.p12"], "")
		if decodeErr != nil {
			log.Info("decode storage.p12 error")
			return
		}
		if time.Until(verifyCert.NotAfter).Hours() < 24 {
			log.Info("storage cert will expire,the storage cert will re-generate!")
		} else {
			return
		}
	}
	key, err := rsa.GenerateKey(rand.Reader, 4096)
	if err != nil {
		log.Info("fail generate privatekey")
		return
	}
	subj := pkix.Name{
		CommonName:         "skywalking-storage",
		Country:            []string{"CN"},
		Province:           []string{"ZJ"},
		Locality:           []string{"HZ"},
		Organization:       []string{"Skywalking"},
		OrganizationalUnit: []string{"Skywalking"},
	}
	asn1Subj, err := asn1.Marshal(subj.ToRDNSequence())
	if err != nil {
		return
	}
	template := x509.CertificateRequest{
		RawSubject:         asn1Subj,
		SignatureAlgorithm: x509.SHA256WithRSA,
	}
	csrBytes, err := x509.CreateCertificateRequest(rand.Reader, &template, key)
	if err != nil {
		log.Info("fail create certificaterequest")
		return
	}
	buffer := new(bytes.Buffer)
	err = pem.Encode(buffer, &pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csrBytes})
	if err != nil {
		log.Info("fail encode CERTIFICATE REQUEST")
		return
	}
	singername := "kubernetes.io/kube-apiserver-client"
	request := certv1beta1.CertificateSigningRequest{
		TypeMeta: metav1.TypeMeta{
			Kind:       "CertificateSigningRequest",
			APIVersion: "certificates.k8s.io/v1beta1",
		},
		ObjectMeta: metav1.ObjectMeta{
			Name: "storage-csr",
		},
		Spec: certv1beta1.CertificateSigningRequestSpec{
			Groups:     []string{"system:authenticated"},
			Request:    buffer.Bytes(),
			SignerName: &singername,
			Usages:     []certv1beta1.KeyUsage{certv1beta1.UsageClientAuth},
		},
	}
	err = clientset.CertificatesV1beta1().CertificateSigningRequests().Delete(ctx, "storage-csr", metav1.DeleteOptions{})
	if err != nil && !apierrors.IsNotFound(err) {
		log.Info("fail delete csr")
		return
	}
	csr, err := clientset.CertificatesV1beta1().CertificateSigningRequests().Create(ctx, &request, metav1.CreateOptions{})
	if err != nil {
		log.Info("fail create csr")
		return
	}
	condition := certv1beta1.CertificateSigningRequestCondition{
		Type:    "Approved",
		Reason:  "ApprovedBySkywalkingStorage",
		Message: "Approved by skywalking storage controller",
	}
	csr.Status.Conditions = append(csr.Status.Conditions, condition)
	updateapproval, err := clientset.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{})
	if err != nil {
		log.Info("fail update approval:", updateapproval)
		return
	}
	for {
		csr, err = clientset.CertificatesV1beta1().CertificateSigningRequests().Get(ctx, "storage-csr", metav1.GetOptions{})
		if err != nil {
			log.Info("fail get storage-csr")
			return
		}
		if csr.Status.Certificate != nil {
			break
		}
	}
	block, _ := pem.Decode(csr.Status.Certificate)
	cert, err := x509.ParseCertificate(block.Bytes)

	if err != nil {
		log.Info("fail parse certificate")
		return
	}
	p12, err := pkcs12.Encode(rand.Reader, key, cert, nil, "")

	if err != nil {
		log.Info("fail encode pkcs12")
		return
	}
	err = clientset.CoreV1().Secrets(s.Namespace).Delete(ctx, "skywalking-storage", metav1.DeleteOptions{})
	if err != nil && !apierrors.IsNotFound(err) {
		log.Info("fail delete secret skywalking-storage")
		return
	}
	data := make(map[string][]byte)
	data["storage.p12"] = p12
	secret := core.Secret{
		TypeMeta: metav1.TypeMeta{
			Kind:       "Secret",
			APIVersion: "v1",
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:      "skywalking-storage",
			Namespace: s.Namespace,
		},
		Data: data,
		Type: "Opaque",
	}
	storageTLSSecret, err := clientset.CoreV1().Secrets(s.Namespace).Create(ctx, &secret, metav1.CreateOptions{})
	if err != nil {
		log.Info("fail create secret skywalking-storage")
		return
	}
	log.Info("success create secret skywalking-storage", storageTLSSecret.Name)
}

func (r *StorageReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&operatorv1alpha1.Storage{}).
		Owns(&core.Service{}).
		Complete(r)
}
