| /* |
| Copyright 2018 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package scale |
| |
| import ( |
| "fmt" |
| "strconv" |
| "time" |
| |
| batch "k8s.io/api/batch/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/util/wait" |
| batchclient "k8s.io/client-go/kubernetes/typed/batch/v1" |
| ) |
| |
| // ScalePrecondition is a deprecated precondition |
| type ScalePrecondition struct { |
| Size int |
| ResourceVersion string |
| } |
| |
| // RetryParams is a deprecated retry struct |
| type RetryParams struct { |
| Interval, Timeout time.Duration |
| } |
| |
| // PreconditionError is a deprecated error |
| type PreconditionError struct { |
| Precondition string |
| ExpectedValue string |
| ActualValue string |
| } |
| |
| func (pe PreconditionError) Error() string { |
| return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue) |
| } |
| |
| // ScaleCondition is a closure around Scale that facilitates retries via util.wait |
| func scaleCondition(r *JobPsuedoScaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc { |
| return func() (bool, error) { |
| rv, err := r.ScaleSimple(namespace, name, precondition, count) |
| if updatedResourceVersion != nil { |
| *updatedResourceVersion = rv |
| } |
| // Retry only on update conflicts. |
| if errors.IsConflict(err) { |
| return false, nil |
| } |
| if err != nil { |
| return false, err |
| } |
| return true, nil |
| } |
| } |
| |
| // JobPsuedoScaler is a deprecated scale-similar thing that doesn't obey scale semantics |
| type JobPsuedoScaler struct { |
| JobsClient batchclient.JobsGetter |
| } |
| |
| // ScaleSimple is responsible for updating job's parallelism. It returns the |
| // resourceVersion of the job if the update is successful. |
| func (scaler *JobPsuedoScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { |
| job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{}) |
| if err != nil { |
| return "", err |
| } |
| if preconditions != nil { |
| if err := validateJob(job, preconditions); err != nil { |
| return "", err |
| } |
| } |
| parallelism := int32(newSize) |
| job.Spec.Parallelism = ¶llelism |
| updatedJob, err := scaler.JobsClient.Jobs(namespace).Update(job) |
| if err != nil { |
| return "", err |
| } |
| return updatedJob.ObjectMeta.ResourceVersion, nil |
| } |
| |
| // Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil), |
| // optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired |
| // number, which can be less than requested based on job's current progress. |
| func (scaler *JobPsuedoScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { |
| if preconditions == nil { |
| preconditions = &ScalePrecondition{-1, ""} |
| } |
| if retry == nil { |
| // Make it try only once, immediately |
| retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} |
| } |
| cond := scaleCondition(scaler, preconditions, namespace, name, newSize, nil) |
| if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { |
| return err |
| } |
| if waitForReplicas != nil { |
| job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{}) |
| if err != nil { |
| return err |
| } |
| err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, jobHasDesiredParallelism(scaler.JobsClient, job)) |
| if err == wait.ErrWaitTimeout { |
| return fmt.Errorf("timed out waiting for %q to be synced", name) |
| } |
| return err |
| } |
| return nil |
| } |
| |
| // JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count |
| // for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. |
| func jobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc { |
| return func() (bool, error) { |
| job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| |
| // desired parallelism can be either the exact number, in which case return immediately |
| if job.Status.Active == *job.Spec.Parallelism { |
| return true, nil |
| } |
| if job.Spec.Completions == nil { |
| // A job without specified completions needs to wait for Active to reach Parallelism. |
| return false, nil |
| } |
| |
| // otherwise count successful |
| progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded |
| return progress <= 0, nil |
| } |
| } |
| |
| func validateJob(job *batch.Job, precondition *ScalePrecondition) error { |
| if precondition.Size != -1 && job.Spec.Parallelism == nil { |
| return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"} |
| } |
| if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size { |
| return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))} |
| } |
| if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion { |
| return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion} |
| } |
| return nil |
| } |