| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| // Package tool provides tools for operator communicating with K8S cluster |
| package tool |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "os" |
| "path/filepath" |
| |
| "github.com/pkg/errors" |
| core_v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/client-go/kubernetes" |
| "k8s.io/client-go/rest" |
| "k8s.io/client-go/tools/clientcmd" |
| "k8s.io/client-go/tools/remotecommand" |
| ) |
| |
| // K8sClient is a struct which contains the kubernetes.Interface and *rest.Config. |
| type K8sClient struct { |
| // kubernetes.Interface should be used instead of kubernets.Inteface for unit test (mocking) |
| ClientSet kubernetes.Interface |
| Config *rest.Config |
| } |
| |
| // NewK8sClient is to generate a K8s client for interacting with the K8S cluster. |
| func NewK8sClient() (*K8sClient, error) { |
| var kubeconfig string |
| if kubeConfigPath := os.Getenv("KUBECONFIG"); kubeConfigPath != "" { |
| kubeconfig = kubeConfigPath // CI process |
| } else { |
| kubeconfig = filepath.Join(os.Getenv("HOME"), ".kube", "config") // Development environment |
| } |
| |
| var config *rest.Config |
| |
| _, err := os.Stat(kubeconfig) |
| if err != nil { |
| // In cluster configuration |
| config, err = rest.InClusterConfig() |
| if err != nil { |
| return nil, err |
| } |
| } else { |
| // Out of cluster configuration |
| config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| clientset, err := kubernetes.NewForConfig(config) |
| if err != nil { |
| return nil, err |
| } |
| var client = &K8sClient{ClientSet: clientset, Config: config} |
| return client, nil |
| } |
| |
| // Exec enables operator to execute command in the pod's container in the K8S cluster. |
| // It returns the standard output and the standard error output. |
| func (client *K8sClient) Exec(namespace, podName, containerName string, command []string, stdin io.Reader) (*bytes.Buffer, *bytes.Buffer, error) { |
| clientset, config := client.ClientSet, client.Config |
| |
| req := clientset.CoreV1().RESTClient().Post(). |
| Resource("pods"). |
| Name(podName). |
| Namespace(namespace). |
| SubResource("exec") |
| |
| scheme := runtime.NewScheme() |
| if err := core_v1.AddToScheme(scheme); err != nil { |
| return nil, nil, fmt.Errorf("error adding to scheme: %v", err) |
| } |
| |
| parameterCodec := runtime.NewParameterCodec(scheme) |
| req.VersionedParams(&core_v1.PodExecOptions{ |
| Command: command, |
| Container: containerName, |
| Stdin: stdin != nil, |
| Stdout: true, |
| Stderr: true, |
| TTY: false, |
| }, parameterCodec) |
| |
| exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) |
| if err != nil { |
| return nil, nil, errors.Wrap(err, "Could not create SPDY executor") |
| } |
| |
| var stdout, stderr bytes.Buffer |
| err = exec.Stream(remotecommand.StreamOptions{ |
| Stdin: stdin, |
| Stdout: &stdout, |
| Stderr: &stderr, |
| Tty: false, |
| }) |
| |
| if err != nil { |
| return nil, nil, errors.Wrap(err, "Error while exec'ing stream") |
| } |
| |
| return &stdout, &stderr, nil |
| } |