blob: 00aa37b30003208bec09ac8be60ab2762b4fb126 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"sync"
)
import (
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
"istio.io/api/annotation"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/istioctl"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/resource"
"github.com/apache/dubbo-go-pixiu/pkg/test/scopes"
)
const maxCoreDumpedPods = 5
var coreDumpedPods = atomic.NewInt32(0)
// PodDumper will dump information from all the pods into the given workDir.
// If no pods are provided, client will be used to fetch all the pods in a namespace.
type PodDumper func(ctx resource.Context, cluster cluster.Cluster, workDir string, namespace string, pods ...corev1.Pod)
func podOutputPath(workDir string, cluster cluster.Cluster, pod corev1.Pod, dumpName string) string {
return outputPath(workDir, cluster, pod.Name, dumpName)
}
// outputPath gives a path in the form of workDir/cluster/<prefix>_<suffix>
func outputPath(workDir string, cluster cluster.Cluster, prefix, suffix string) string {
dir := path.Join(workDir, cluster.StableName())
if err := os.MkdirAll(dir, os.ModeDir|0o700); err != nil {
scopes.Framework.Warnf("failed creating directory: %s", dir)
}
return path.Join(dir, fmt.Sprintf("%s_%s", prefix, suffix))
}
func DumpDeployments(ctx resource.Context, workDir, namespace string) {
errG := multierror.Group{}
for _, cluster := range ctx.AllClusters().Kube() {
deps, err := cluster.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
scopes.Framework.Warnf("Error getting deployments: %v", err)
return
}
for _, deployment := range deps.Items {
deployment := deployment
errG.Go(func() error {
out, err := yaml.Marshal(deployment)
if err != nil {
return err
}
return os.WriteFile(outputPath(workDir, cluster, deployment.Name, "deployment.yaml"), out, os.ModePerm)
})
}
}
_ = errG.Wait()
}
func DumpWebhooks(ctx resource.Context, workDir string) {
errG := multierror.Group{}
for _, cluster := range ctx.AllClusters().Kube() {
mwhs, err := cluster.AdmissionregistrationV1().MutatingWebhookConfigurations().List(context.TODO(), metav1.ListOptions{})
if err != nil {
scopes.Framework.Warnf("Error getting mutating webhook configurations: %v", err)
return
}
for _, mwh := range mwhs.Items {
mwh := mwh
errG.Go(func() error {
out, err := yaml.Marshal(mwh)
if err != nil {
return err
}
return os.WriteFile(outputPath(workDir, cluster, mwh.Name, "mutatingwebhook.yaml"), out, os.ModePerm)
})
}
vwhs, err := cluster.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), metav1.ListOptions{})
if err != nil {
scopes.Framework.Warnf("Error getting validating webhook configurations: %v", err)
return
}
for _, vwh := range vwhs.Items {
vwh := vwh
errG.Go(func() error {
out, err := yaml.Marshal(vwh)
if err != nil {
return err
}
return os.WriteFile(outputPath(workDir, cluster, vwh.Name, "validatingwebhook.yaml"), out, os.ModePerm)
})
}
}
_ = errG.Wait()
}
// DumpPods runs each dumper with the selected pods in the given namespace.
// If selectors is empty, all pods in the namespace will be dumpped.
// If no dumpers are provided, their resource state, events, container logs and Envoy information will be dumped.
func DumpPods(ctx resource.Context, workDir, namespace string, selectors []string, dumpers ...PodDumper) {
if len(dumpers) == 0 {
dumpers = []PodDumper{
DumpPodState,
DumpPodEvents,
DumpPodLogs,
DumpPodProxies,
DumpNdsz,
DumpCoreDumps,
}
}
wg := sync.WaitGroup{}
for _, cluster := range ctx.AllClusters().Kube() {
pods, err := cluster.PodsForSelector(context.TODO(), namespace, selectors...)
if err != nil {
scopes.Framework.Warnf("Error getting pods list via kubectl: %v", err)
return
}
if len(pods.Items) == 0 {
continue
}
for _, dump := range dumpers {
cluster, dump := cluster, dump
wg.Add(1)
go func() {
dump(ctx, cluster, workDir, namespace, pods.Items...)
wg.Done()
}()
}
}
wg.Wait()
}
const coredumpDir = "/var/lib/istio"
func DumpCoreDumps(ctx resource.Context, c cluster.Cluster, workDir string, namespace string, pods ...corev1.Pod) {
if coreDumpedPods.Load() >= maxCoreDumpedPods {
return
}
pods = podsOrFetch(c, pods, namespace)
for _, pod := range pods {
if coreDumpedPods.Load() >= maxCoreDumpedPods {
return
}
wroteDumpsForPod := false
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for _, container := range containers {
if container.Name != "istio-proxy" {
continue
}
restarts := containerRestarts(pod, "istio-proxy")
crashed, _ := containerCrashed(pod, "istio-proxy")
if !crashed || restarts == 0 {
// no need to store this dump
continue
}
findDumps := fmt.Sprintf("find %s -name core.*", coredumpDir)
stdout, _, err := c.PodExec(pod.Name, pod.Namespace, container.Name, findDumps)
if err != nil {
scopes.Framework.Warnf("Unable to get core dumps for pod: %s/%s: %v", pod.Namespace, pod.Name, err)
continue
}
for _, cd := range strings.Split(stdout, "\n") {
if strings.TrimSpace(cd) == "" {
continue
}
stdout, _, err := c.PodExec(pod.Name, pod.Namespace, container.Name, "cat "+cd)
if err != nil {
scopes.Framework.Warnf("Unable to get core dumps %v for pod: %s/%s: %v", cd, pod.Namespace, pod.Name, err)
continue
}
fname := podOutputPath(workDir, c, pod, filepath.Base(cd))
if err = os.WriteFile(fname, []byte(stdout), os.ModePerm); err != nil {
scopes.Framework.Warnf("Unable to write envoy core dump log for pod: %s/%s: %v", pod.Namespace, pod.Name, err)
} else {
wroteDumpsForPod = true
}
}
}
if wroteDumpsForPod {
coreDumpedPods.Inc()
}
}
}
func podsOrFetch(a cluster.Cluster, pods []corev1.Pod, namespace string) []corev1.Pod {
if len(pods) == 0 {
podList, err := a.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
scopes.Framework.Warnf("Error getting pods list via kubectl: %v", err)
return nil
}
pods = podList.Items
}
return pods
}
// DumpPodState dumps the pod state for either the provided pods or all pods in the namespace if none are provided.
func DumpPodState(_ resource.Context, c cluster.Cluster, workDir string, namespace string, pods ...corev1.Pod) {
pods = podsOrFetch(c, pods, namespace)
for _, pod := range pods {
out, err := yaml.Marshal(&pod)
if err != nil {
scopes.Framework.Warnf("Error marshaling pod state for output: %v", err)
continue
}
outPath := podOutputPath(workDir, c, pod, "pod-state.yaml")
if err := os.WriteFile(outPath, out, os.ModePerm); err != nil {
scopes.Framework.Infof("Error writing out pod state to file: %v", err)
}
}
}
// DumpPodEvents dumps the pod events for either the provided pods or all pods in the namespace if none are provided.
func DumpPodEvents(_ resource.Context, c cluster.Cluster, workDir, namespace string, pods ...corev1.Pod) {
pods = podsOrFetch(c, pods, namespace)
for _, pod := range pods {
list, err := c.CoreV1().Events(namespace).List(context.TODO(),
metav1.ListOptions{
FieldSelector: "involvedObject.name=" + pod.Name,
})
if err != nil {
scopes.Framework.Warnf("Error getting events list for pod %s/%s via kubectl: %v", namespace, pod.Name, err)
return
}
out, err := yaml.Marshal(list.Items)
if err != nil {
scopes.Framework.Warnf("Error marshaling pod event for output: %v", err)
continue
}
outPath := podOutputPath(workDir, c, pod, "pod-events.yaml")
if err := os.WriteFile(outPath, out, os.ModePerm); err != nil {
scopes.Framework.Infof("Error writing out pod events to file: %v", err)
}
}
}
// containerRestarts checks how many times container has ever restarted
func containerRestarts(pod corev1.Pod, container string) int {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == container {
return int(cs.RestartCount)
}
}
// No match - assume that means no restart
return 0
}
func containerCrashed(pod corev1.Pod, container string) (bool, *corev1.ContainerStateTerminated) {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == container && cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 {
return true, cs.State.Terminated
}
}
return false, nil
}
// DumpPodLogs will dump logs from each container in each of the provided pods
// or all pods in the namespace if none are provided.
func DumpPodLogs(_ resource.Context, c cluster.Cluster, workDir, namespace string, pods ...corev1.Pod) {
pods = podsOrFetch(c, pods, namespace)
for _, pod := range pods {
isVM := checkIfVM(pod)
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for _, container := range containers {
l, err := c.PodLogs(context.TODO(), pod.Name, pod.Namespace, container.Name, false /* previousLog */)
if err != nil {
scopes.Framework.Warnf("Unable to get logs for pod/container: %s/%s/%s for: %v", pod.Namespace, pod.Name, container.Name, err)
}
fname := podOutputPath(workDir, c, pod, fmt.Sprintf("%s.log", container.Name))
if err = os.WriteFile(fname, []byte(l), os.ModePerm); err != nil {
scopes.Framework.Warnf("Unable to write logs for pod/container: %s/%s/%s", pod.Namespace, pod.Name, container.Name)
}
// Get previous container logs, if applicable
if restarts := containerRestarts(pod, container.Name); restarts > 0 {
// only care about istio components restart
if container.Name == "istio-proxy" || container.Name == "discovery" || container.Name == "istio-init" ||
container.Name == "istio-validation" || strings.HasPrefix(pod.Name, "istio-cni-node") {
// This is only called if the test failed, so we cannot mark it as "failed" again. Instead, output
// a log which will get highlighted in the test logs
// TODO proper analysis of restarts to ensure we do not miss crashes when tests still pass.
scopes.Framework.Errorf("FAIL: pod %v/%v container %v restarted %d times", pod.Name, pod.Namespace, container.Name, restarts)
}
l, err := c.PodLogs(context.TODO(), pod.Name, pod.Namespace, container.Name, true /* previousLog */)
if err != nil {
scopes.Framework.Warnf("Unable to get previous logs for pod/container: %s/%s/%s", pod.Namespace, pod.Name, container.Name)
}
fname := podOutputPath(workDir, c, pod, fmt.Sprintf("%s.previous.log", container.Name))
if err = os.WriteFile(fname, []byte(l), os.ModePerm); err != nil {
scopes.Framework.Warnf("Unable to write previous logs for pod/container: %s/%s/%s", pod.Namespace, pod.Name, container.Name)
}
}
if crashed, terminateState := containerCrashed(pod, container.Name); crashed {
scopes.Framework.Errorf("FAIL: pod %v/%v crashed with status: %+v", pod.Name, container.Name, terminateState)
}
// Get envoy logs if the pod is a VM, since kubectl logs only shows the logs from iptables for VMs
if isVM && container.Name == "istio-proxy" {
if stdout, stderr, err := c.PodExec(pod.Name, pod.Namespace, container.Name, "cat /var/log/istio/istio.err.log"); err == nil {
fname := podOutputPath(workDir, c, pod, fmt.Sprintf("%s.envoy.err.log", container.Name))
if err = os.WriteFile(fname, []byte(stdout+stderr), os.ModePerm); err != nil {
scopes.Framework.Warnf("Unable to write envoy err log for pod/container: %s/%s/%s", pod.Namespace, pod.Name, container.Name)
}
if strings.Contains(stdout, "envoy backtrace") {
scopes.Framework.Errorf("FAIL: VM %v/%v crashed", pod.Name, container.Name)
}
} else {
scopes.Framework.Warnf("Unable to get envoy err log for pod: %s/%s", pod.Namespace, pod.Name)
}
if stdout, stderr, err := c.PodExec(pod.Name, pod.Namespace, container.Name, "cat /var/log/istio/istio.log"); err == nil {
fname := podOutputPath(workDir, c, pod, fmt.Sprintf("%s.envoy.log", container.Name))
if err = os.WriteFile(fname, []byte(stdout+stderr), os.ModePerm); err != nil {
scopes.Framework.Warnf("Unable to write envoy log for pod/container: %s/%s/%s", pod.Namespace, pod.Name, container.Name)
}
} else {
scopes.Framework.Warnf("Unable to get envoy log for pod: %s/%s", pod.Namespace, pod.Name)
}
}
}
}
}
// DumpPodProxies will dump Envoy proxy config and clusters in each of the provided pods
// or all pods in the namespace if none are provided.
func DumpPodProxies(_ resource.Context, c cluster.Cluster, workDir, namespace string, pods ...corev1.Pod) {
pods = podsOrFetch(c, pods, namespace)
for _, pod := range pods {
if !hasEnvoy(pod) {
continue
}
dumpProxyCommand(c, pod, workDir, "proxy-config.json", "pilot-agent request GET config_dump?include_eds=true")
dumpProxyCommand(c, pod, workDir, "proxy-clusters.txt", "pilot-agent request GET clusters")
dumpProxyCommand(c, pod, workDir, "proxy-stats.txt", "pilot-agent request GET stats/prometheus")
}
}
func dumpProxyCommand(c cluster.Cluster, pod corev1.Pod, workDir, filename, command string) {
isVM := checkIfVM(pod)
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for _, container := range containers {
if container.Name != "istio-proxy" && !isVM {
// if we don't have istio-proxy container, and we're not running as a VM, agent isn't running
continue
}
if cfgDump, _, err := c.PodExec(pod.Name, pod.Namespace, container.Name, command); err == nil {
fname := podOutputPath(workDir, c, pod, filename)
if err = os.WriteFile(fname, []byte(cfgDump), os.ModePerm); err != nil {
scopes.Framework.Errorf("Unable to write output for command %q on pod/container: %s/%s/%s", command, pod.Namespace, pod.Name, container.Name)
}
} else {
scopes.Framework.Errorf("Unable to get execute command %q on pod: %s/%s for: %v", command, pod.Namespace, pod.Name, err)
}
}
}
func hasEnvoy(pod corev1.Pod) bool {
if checkIfVM(pod) {
// assume VMs run Envoy
return true
}
f := false
for _, c := range pod.Spec.Containers {
if c.Name == "istio-proxy" {
f = true
break
}
}
if !f {
// no proxy container
return false
}
for k, v := range pod.ObjectMeta.Annotations {
if k == annotation.InjectTemplates.Name && strings.HasPrefix(v, "grpc-") {
// proxy container may run only agent for proxyless gRPC
return false
}
}
return true
}
func checkIfVM(pod corev1.Pod) bool {
for k := range pod.ObjectMeta.Labels {
if strings.Contains(k, "test-vm") {
return true
}
}
return false
}
func DumpDebug(ctx resource.Context, c cluster.Cluster, workDir string, endpoint string) {
ik, err := istioctl.New(ctx, istioctl.Config{Cluster: c})
if err != nil {
scopes.Framework.Warnf("failed dumping %q: %v", endpoint, err)
return
}
args := []string{"x", "internal-debug", "--all", endpoint}
if ctx.Settings().Revisions.Default() != "" {
args = append(args, "--revision", ctx.Settings().Revisions.Default())
}
scopes.Framework.Debugf("dump %v: %v", endpoint, args)
stdout, _, err := ik.Invoke(args)
if err != nil {
scopes.Framework.Warnf("failed dumping %q: %v", endpoint, err)
return
}
outputs := map[string]string{}
if err := json.Unmarshal([]byte(stdout), &outputs); err != nil {
scopes.Framework.Warnf("failed dumping %q: %v", endpoint, err)
return
}
for istiod, out := range outputs {
outPath := outputPath(workDir, c, istiod, endpoint)
if err := os.WriteFile(outPath, []byte(out), 0o644); err != nil {
scopes.Framework.Warnf("failed dumping %q: %v", endpoint, err)
return
}
}
}
func DumpNdsz(_ resource.Context, c cluster.Cluster, workDir string, _ string, pods ...corev1.Pod) {
for _, pod := range pods {
dumpProxyCommand(c, pod, workDir, "ndsz.json", "pilot-agent request --debug-port 15020 GET /debug/ndsz")
}
}