blob: 58dcffcfeaef5407bc86fe289c5ae253b57603f5 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubectlcmd
import (
"bytes"
"context"
"fmt"
"os/exec"
"strings"
"sync"
"time"
)
import (
"golang.org/x/time/rate"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/operator/pkg/util"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
"github.com/apache/dubbo-go-pixiu/tools/bug-report/pkg/common"
)
const (
// maxRequestsPerSecond is the max rate of requests to the API server.
maxRequestsPerSecond = 10
// maxLogFetchConcurrency is the max number of logs to fetch simultaneously.
maxLogFetchConcurrency = 10
// reportInterval controls how frequently to output progress reports on running tasks.
reportInterval = 30 * time.Second
)
var (
requestLimiter = rate.NewLimiter(maxRequestsPerSecond, maxRequestsPerSecond)
logFetchLimitCh = make(chan struct{}, maxLogFetchConcurrency)
// runningTasks tracks the in-flight fetch operations for user feedback.
runningTasks = sets.New()
runningTasksMu sync.RWMutex
// runningTasksTicker is the report interval for running tasks.
runningTasksTicker = time.NewTicker(reportInterval)
)
func ReportRunningTasks() {
go func() {
time.Sleep(reportInterval)
for range runningTasksTicker.C {
printRunningTasks()
}
}()
}
// Options contains the Run options.
type Options struct {
// Path to the kubeconfig file.
Kubeconfig string
// ComponentName of the kubeconfig context to use.
Context string
// namespace - k8s namespace for Run command
Namespace string
// DryRun performs all steps but only logs the Run command without running it.
DryRun bool
// Maximum amount of time to wait for resources to be ready after install when Wait=true.
WaitTimeout time.Duration
// output - output mode for Run i.e. --output.
Output string
// extraArgs - more args to be added to the Run command, which are appended to
// the end of the Run command.
ExtraArgs []string
}
// Logs returns the logs for the given namespace/pod/container.
func Logs(client kube.ExtendedClient, namespace, pod, container string, previous, dryRun bool) (string, error) {
if dryRun {
return fmt.Sprintf("Dry run: would be running client.PodLogs(%s, %s, %s)", pod, namespace, container), nil
}
// ignore cancellation errors since this is subject to global timeout.
_ = requestLimiter.Wait(context.TODO())
logFetchLimitCh <- struct{}{}
defer func() {
<-logFetchLimitCh
}()
task := fmt.Sprintf("PodLogs %s/%s/%s", namespace, pod, container)
addRunningTask(task)
defer removeRunningTask(task)
return client.PodLogs(context.TODO(), pod, namespace, container, previous)
}
// EnvoyGet sends a GET request for the URL in the Envoy container in the given namespace/pod and returns the result.
func EnvoyGet(client kube.ExtendedClient, namespace, pod, url string, dryRun bool) (string, error) {
if dryRun {
return fmt.Sprintf("Dry run: would be running client.EnvoyDo(%s, %s, %s)", pod, namespace, url), nil
}
_ = requestLimiter.Wait(context.TODO())
task := fmt.Sprintf("ProxyGet %s/%s:%s", namespace, pod, url)
addRunningTask(task)
defer removeRunningTask(task)
out, err := client.EnvoyDo(context.TODO(), pod, namespace, "GET", url)
return string(out), err
}
// Cat runs the cat command for the given path in the given namespace/pod/container.
func Cat(client kube.ExtendedClient, namespace, pod, container, path string, dryRun bool) (string, error) {
cmdStr := "cat " + path
if dryRun {
return fmt.Sprintf("Dry run: would be running podExec %s/%s/%s:%s", pod, namespace, container, cmdStr), nil
}
_ = requestLimiter.Wait(context.TODO())
logFetchLimitCh <- struct{}{}
defer func() {
<-logFetchLimitCh
}()
task := fmt.Sprintf("PodExec %s/%s/%s:%s", namespace, pod, container, cmdStr)
addRunningTask(task)
defer removeRunningTask(task)
stdout, stderr, err := client.PodExec(pod, namespace, container, cmdStr)
if err != nil {
return "", fmt.Errorf("podExec error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
err, util.ConsolidateLog(stderr), stdout)
}
return stdout, nil
}
// Exec runs exec for the given command in the given namespace/pod/container.
func Exec(client kube.ExtendedClient, namespace, pod, container, cmdStr string, dryRun bool) (string, error) {
if dryRun {
return fmt.Sprintf("Dry run: would be running podExec %s/%s/%s:%s", pod, namespace, container, cmdStr), nil
}
_ = requestLimiter.Wait(context.TODO())
task := fmt.Sprintf("PodExec %s/%s/%s:%s", namespace, pod, container, cmdStr)
addRunningTask(task)
defer removeRunningTask(task)
stdout, stderr, err := client.PodExec(pod, namespace, container, cmdStr)
if err != nil {
return "", fmt.Errorf("podExec error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
err, util.ConsolidateLog(stderr), stdout)
}
return stdout, nil
}
// RunCmd runs the given command in kubectl, adding -n namespace if namespace is not empty.
func RunCmd(command, namespace, kubeConfig, kubeContext string, dryRun bool) (string, error) {
return Run(strings.Split(command, " "),
&Options{
Namespace: namespace,
DryRun: dryRun,
Kubeconfig: kubeConfig,
Context: kubeContext,
})
}
// Run runs the kubectl command by specifying subcommands in subcmds with opts.
func Run(subcmds []string, opts *Options) (string, error) {
args := subcmds
if opts.Kubeconfig != "" {
args = append(args, "--kubeconfig", opts.Kubeconfig)
}
if opts.Context != "" {
args = append(args, "--context", opts.Context)
}
if opts.Namespace != "" {
args = append(args, "-n", opts.Namespace)
}
if opts.Output != "" {
args = append(args, "-o", opts.Output)
}
args = append(args, opts.ExtraArgs...)
cmd := exec.Command("kubectl", args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmdStr := strings.Join(args, " ")
if opts.DryRun {
log.Infof("dry run mode: would be running this cmd:\nkubectl %s\n", cmdStr)
return "", nil
}
_ = requestLimiter.Wait(context.TODO())
task := fmt.Sprintf("kubectl %s", cmdStr)
addRunningTask(task)
defer removeRunningTask(task)
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("kubectl error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
err, util.ConsolidateLog(stderr.String()), stdout.String())
}
return stdout.String(), nil
}
func printRunningTasks() {
runningTasksMu.RLock()
defer runningTasksMu.RUnlock()
if runningTasks.IsEmpty() {
return
}
common.LogAndPrintf("The following fetches are still running: \n")
for t := range runningTasks {
common.LogAndPrintf(" %s\n", t)
}
common.LogAndPrintf("\n")
}
func addRunningTask(task string) {
runningTasksMu.Lock()
defer runningTasksMu.Unlock()
log.Infof("STARTING %s", task)
runningTasks.Insert(task)
}
func removeRunningTask(task string) {
runningTasksMu.Lock()
defer runningTasksMu.Unlock()
log.Infof("COMPLETED %s", task)
runningTasks.Delete(task)
}