| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package util |
| |
| import ( |
| "bytes" |
| "context" |
| "fmt" |
| solr "github.com/apache/solr-operator/api/v1beta1" |
| "github.com/apache/solr-operator/controllers/util/solr_api" |
| "github.com/go-logr/logr" |
| "github.com/robfig/cron/v3" |
| corev1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/client-go/kubernetes" |
| "k8s.io/client-go/rest" |
| "k8s.io/client-go/tools/remotecommand" |
| "net/url" |
| "strconv" |
| "time" |
| ) |
| |
| func GetBackupRepositoryByName(backupRepos []solr.SolrBackupRepository, repositoryName string) *solr.SolrBackupRepository { |
| // If no name is given and only 1 repo exists, return the repo |
| if repositoryName == "" && len(backupRepos) == 1 { |
| return &backupRepos[0] |
| } |
| //Build map of string->BackupRepository |
| for _, repo := range backupRepos { |
| if repo.Name == repositoryName { |
| return &repo |
| } |
| } |
| return nil |
| } |
| |
| func FullCollectionBackupName(collection string, backupName string) string { |
| return fmt.Sprintf("%s-%s", backupName, collection) |
| } |
| |
| func AsyncIdForCollectionBackup(collection string, backupName string) string { |
| return fmt.Sprintf("%s-%s", backupName, collection) |
| } |
| |
| func UpdateStatusOfCollectionBackups(backupStatus *solr.IndividualSolrBackupStatus) (allFinished bool) { |
| // Check if all collection backups have been completed, this is updated in the loop |
| allFinished = len(backupStatus.CollectionBackupStatuses) > 0 |
| |
| allSuccessful := len(backupStatus.CollectionBackupStatuses) > 0 |
| |
| for _, collectionStatus := range backupStatus.CollectionBackupStatuses { |
| allFinished = allFinished && collectionStatus.Finished |
| allSuccessful = allSuccessful && (collectionStatus.Successful != nil && *collectionStatus.Successful) |
| } |
| |
| backupStatus.Finished = allFinished |
| if allFinished && backupStatus.Successful == nil { |
| backupStatus.Successful = &allSuccessful |
| } |
| return |
| } |
| |
| func GenerateQueryParamsForBackup(backupRepository *solr.SolrBackupRepository, backup *solr.SolrBackup, collection string) url.Values { |
| queryParams := url.Values{} |
| queryParams.Add("action", "BACKUP") |
| queryParams.Add("collection", collection) |
| queryParams.Add("name", FullCollectionBackupName(collection, backup.Name)) |
| queryParams.Add("async", AsyncIdForCollectionBackup(collection, backup.Name)) |
| queryParams.Add("location", BackupLocationPath(backupRepository, backup.Spec.Location)) |
| queryParams.Add("repository", backupRepository.Name) |
| |
| if backup.Spec.Recurrence.IsEnabled() { |
| queryParams.Add("maxNumBackupPoints", strconv.Itoa(backup.Spec.Recurrence.MaxSaved)) |
| } |
| |
| return queryParams |
| } |
| |
| func StartBackupForCollection(ctx context.Context, cloud *solr.SolrCloud, backupRepository *solr.SolrBackupRepository, backup *solr.SolrBackup, collection string, logger logr.Logger) (success bool, err error) { |
| queryParams := GenerateQueryParamsForBackup(backupRepository, backup, collection) |
| resp := &solr_api.SolrAsyncResponse{} |
| |
| logger.Info("Calling to start collection backup", "solrCloud", cloud.Name, "collection", collection) |
| err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, resp) |
| |
| if err == nil { |
| if resp.ResponseHeader.Status == 0 { |
| success = true |
| } |
| } else { |
| logger.Error(err, "Error starting collection backup", "solrCloud", cloud.Name, "collection", collection) |
| } |
| |
| return success, err |
| } |
| |
| func CheckBackupForCollection(ctx context.Context, cloud *solr.SolrCloud, collection string, backupName string, logger logr.Logger) (finished bool, success bool, asyncStatus string, err error) { |
| logger.Info("Calling to check on collection backup", "solrCloud", cloud.Name, "collection", collection) |
| |
| var message string |
| asyncStatus, message, err = solr_api.CheckAsyncRequest(ctx, cloud, AsyncIdForCollectionBackup(collection, backupName)) |
| |
| if err == nil { |
| if asyncStatus == "completed" { |
| finished = true |
| success = true |
| } |
| if asyncStatus == "failed" { |
| finished = true |
| success = false |
| } |
| } else { |
| logger.Error(err, "Error checking on collection backup", "solrCloud", cloud.Name, "collection", collection, "message", message) |
| } |
| |
| return finished, success, asyncStatus, err |
| } |
| |
| func DeleteAsyncInfoForBackup(ctx context.Context, cloud *solr.SolrCloud, collection string, backupName string, logger logr.Logger) (err error) { |
| logger.Info("Calling to delete async info for backup command.", "solrCloud", cloud.Name, "collection", collection) |
| var message string |
| message, err = solr_api.DeleteAsyncRequest(ctx, cloud, AsyncIdForCollectionBackup(collection, backupName)) |
| |
| if err != nil { |
| logger.Error(err, "Error deleting async data for collection backup", "solrCloud", cloud.Name, "collection", collection, "message", message) |
| } |
| |
| return err |
| } |
| |
| func EnsureDirectoryForBackup(solrCloud *solr.SolrCloud, backupRepository *solr.SolrBackupRepository, backup *solr.SolrBackup, config *rest.Config) (err error) { |
| // Directory creation only required/possible for volume (i.e. local) backups |
| if IsRepoVolume(backupRepository) { |
| backupPath := BackupLocationPath(backupRepository, backup.Spec.Location) |
| return RunExecForPod( |
| solrCloud.GetAllSolrPodNames()[0], |
| solrCloud.Namespace, |
| []string{"/bin/bash", "-c", "mkdir -p " + backupPath}, |
| config, |
| ) |
| } |
| return nil |
| } |
| |
| func RunExecForPod(podName string, namespace string, command []string, config *rest.Config) (err error) { |
| client := &kubernetes.Clientset{} |
| if client, err = kubernetes.NewForConfig(config); err != nil { |
| return err |
| } |
| req := client.CoreV1().RESTClient().Post(). |
| Resource("pods"). |
| Name(podName). |
| Namespace(namespace). |
| SubResource("exec") |
| scheme := runtime.NewScheme() |
| if err = corev1.AddToScheme(scheme); err != nil { |
| return fmt.Errorf("error adding to scheme: %v", err) |
| } |
| |
| parameterCodec := runtime.NewParameterCodec(scheme) |
| req.VersionedParams(&corev1.PodExecOptions{ |
| Command: command, |
| Container: "solrcloud-node", |
| Stdin: false, |
| Stdout: true, |
| Stderr: true, |
| TTY: false, |
| }, parameterCodec) |
| |
| var exec remotecommand.Executor |
| exec, err = remotecommand.NewSPDYExecutor(config, "POST", req.URL()) |
| if err != nil { |
| return fmt.Errorf("error while creating Executor: %v", err) |
| } |
| |
| var stdout, stderr bytes.Buffer |
| err = exec.Stream(remotecommand.StreamOptions{ |
| Stdout: &stdout, |
| Stderr: &stderr, |
| Tty: false, |
| }) |
| |
| if err != nil { |
| return fmt.Errorf("error in Stream: %v", err) |
| } |
| |
| return nil |
| } |
| |
| func ScheduleNextBackup(restartSchedule string, lastBackupTime time.Time) (nextBackup time.Time, err error) { |
| if parsedSchedule, parseErr := cron.ParseStandard(restartSchedule); parseErr != nil { |
| err = parseErr |
| } else { |
| nextBackup = parsedSchedule.Next(lastBackupTime) |
| } |
| return |
| } |
| |
| func ListAllSolrCollections(ctx context.Context, cloud *solr.SolrCloud, logger logr.Logger) (collections []string, err error) { |
| logger.Info("Listing all Solr collections available", "solrCloud", cloud.Name) |
| resp := &solr_api.SolrCollectionsListing{} |
| queryParams := url.Values{} |
| queryParams.Add("action", "LIST") |
| err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, resp) |
| if err == nil { |
| if hasError, apiErr := solr_api.CheckForCollectionsApiError("LIST", resp.ResponseHeader); hasError { |
| err = apiErr |
| } |
| } |
| return resp.Collections, err |
| } |