blob: 76a037ce43a3ebe537d672133279dfcd66eb4618 [file] [log] [blame]
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"encoding/json"
"os"
"time"
"github.com/hashicorp/go-version"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/util/defaults"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/api"
)
const timeoutAnnotation = "sonataflow.org/timeout"
func newMonitorPodAction() Action {
return &monitorPodAction{}
}
type monitorPodAction struct {
baseAction
}
// Name returns a common name of the action.
func (action *monitorPodAction) Name() string {
return "monitor-pod"
}
// CanHandle tells whether this action can handle the build.
func (action *monitorPodAction) CanHandle(build *api.ContainerBuild) bool {
return build.Status.Phase == api.ContainerBuildPhasePending || build.Status.Phase == api.ContainerBuildPhaseRunning
}
func (action *monitorPodAction) Handle(ctx context.Context, build *api.ContainerBuild) (*api.ContainerBuild, error) {
pod, err := getBuilderPod(ctx, action.client, build)
if err != nil {
return nil, err
}
if pod == nil {
switch build.Status.Phase {
case api.ContainerBuildPhasePending:
if pod, err = newBuildPod(ctx, action.client, build); err != nil {
return nil, err
}
// TODO: every object we create, must pass to a listener for our client code. For example, an operator would like to add their labels/owner refs
if err = action.client.Create(ctx, pod); err != nil {
return nil, errors.Wrap(err, "cannot create build pod")
}
case api.ContainerBuildPhaseRunning:
// Emulate context cancellation
build.Status.Phase = api.ContainerBuildPhaseInterrupted
build.Status.Error = "Pod deleted"
return build, nil
}
}
switch pod.Status.Phase {
case corev1.PodPending, corev1.PodRunning:
// Pod remains in pending phase when init containers execute
if action.isPodScheduled(pod) {
build.Status.Phase = api.ContainerBuildPhaseRunning
}
if time.Since(build.Status.StartedAt.Time) > build.Spec.Timeout.Duration {
// Patch the Pod with an annotation, to identify termination signal
// has been sent because the ContainerBuild has timed out
if err = action.addTimeoutAnnotation(ctx, pod, metav1.Now()); err != nil {
return nil, err
}
// In latest Kaniko versions kill is no more available in image's $PATH, do we still need it?
// Send SIGTERM signal to running containers
current, err := version.NewVersion(defaults.KanikoVersion)
maxVersionSupportingKill, err := version.NewVersion(defaults.KanikoVersionSupportingKill)
if current.LessThanOrEqual(maxVersionSupportingKill) {
if err = action.sigterm(pod); err != nil {
// Requeue
return nil, err
}
}
}
case corev1.PodSucceeded:
build.Status.Phase = api.ContainerBuildPhaseSucceeded
// Remove the annotation in case the ContainerBuild succeeded, between
// the timeout deadline and the termination signal.
if err = action.removeTimeoutAnnotation(ctx, pod); err != nil {
return nil, err
}
finishedAt := action.getTerminatedTime(pod)
duration := finishedAt.Sub(build.Status.StartedAt.Time)
build.Status.Duration = duration.String()
for _, task := range build.Spec.Tasks {
if t := task.Kaniko; t != nil {
build.Status.RepositoryImageTag = t.GetRepositoryImageTag()
break
}
}
case corev1.PodFailed:
phase := api.ContainerBuildPhaseFailed
message := "Pod failed"
if terminationMessage := action.getTerminationMessage(pod); terminationMessage != "" {
message = terminationMessage
}
if pod.DeletionTimestamp != nil {
phase = api.ContainerBuildPhaseInterrupted
message = "Pod deleted"
} else if _, ok := pod.GetAnnotations()[timeoutAnnotation]; ok {
message = "ContainerBuild timeout"
}
// Do not override errored build
if build.Status.Phase == api.ContainerBuildPhaseError {
phase = api.ContainerBuildPhaseError
}
build.Status.Phase = phase
build.Status.Error = message
finishedAt := action.getTerminatedTime(pod)
duration := finishedAt.Sub(build.Status.StartedAt.Time)
build.Status.Duration = duration.String()
}
return build, nil
}
func (action *monitorPodAction) sigterm(pod *corev1.Pod) error {
var containers []corev1.ContainerStatus
containers = append(containers, pod.Status.InitContainerStatuses...)
containers = append(containers, pod.Status.ContainerStatuses...)
for _, container := range containers {
if container.State.Running == nil {
continue
}
r := action.client.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(pod.Namespace).
Name(pod.Name).
SubResource("exec").
Param("container", container.Name)
r.VersionedParams(&corev1.PodExecOptions{
Container: container.Name,
Command: []string{"kill", "-SIGTERM", "1"},
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(action.client.GetConfig(), "POST", r.URL())
if err != nil {
return err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: false,
})
if err != nil {
return err
}
}
return nil
}
func (action *monitorPodAction) isPodScheduled(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodScheduled && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}
func (action *monitorPodAction) addTimeoutAnnotation(ctx context.Context, pod *corev1.Pod, time metav1.Time) error {
if _, ok := pod.GetAnnotations()[timeoutAnnotation]; ok {
return nil
}
return action.patchPod(ctx, pod, func(p *corev1.Pod) {
if p.GetAnnotations() != nil {
p.GetAnnotations()[timeoutAnnotation] = time.String()
} else {
p.SetAnnotations(map[string]string{
timeoutAnnotation: time.String(),
})
}
})
}
func (action *monitorPodAction) removeTimeoutAnnotation(ctx context.Context, pod *corev1.Pod) error {
if _, ok := pod.GetAnnotations()[timeoutAnnotation]; !ok {
return nil
}
return action.patchPod(ctx, pod, func(p *corev1.Pod) {
delete(p.GetAnnotations(), timeoutAnnotation)
})
}
func (action *monitorPodAction) patchPod(ctx context.Context, pod *corev1.Pod, mutate func(*corev1.Pod)) error {
target := pod.DeepCopy()
mutate(target)
if err := action.client.Patch(ctx, target, ctrl.MergeFrom(pod)); err != nil {
return err
}
*pod = *target
return nil
}
func (action *monitorPodAction) getTerminatedTime(pod *corev1.Pod) metav1.Time {
var finishedAt metav1.Time
var containers []corev1.ContainerStatus
containers = append(containers, pod.Status.InitContainerStatuses...)
containers = append(containers, pod.Status.ContainerStatuses...)
for _, container := range containers {
if container.State.Terminated == nil {
// The container has not run
continue
}
if t := container.State.Terminated.FinishedAt; finishedAt.IsZero() || t.After(finishedAt.Time) {
finishedAt = t
}
}
return finishedAt
}
func (action *monitorPodAction) getTerminationMessage(pod *corev1.Pod) string {
var terminationMessages []terminationMessage
var containers []corev1.ContainerStatus
containers = append(containers, pod.Status.InitContainerStatuses...)
containers = append(containers, pod.Status.ContainerStatuses...)
for _, container := range containers {
if t := container.State.Terminated; t != nil && t.ExitCode != 0 && t.Message != "" {
terminationMessages = append(terminationMessages, terminationMessage{
Container: container.Name,
Message: t.Message,
})
}
}
switch len(terminationMessages) {
case 0:
return ""
case 1:
return terminationMessages[0].Message
default:
message, err := json.Marshal(terminationMessages)
if err != nil {
return ""
}
return string(message)
}
}
type terminationMessage struct {
Container string `json:"container,omitempty"`
Message string `json:"message,omitempty"`
}