blob: b95256a73eb356b5921211392e9649ad14bd6aef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package tool provides tools for operator communicating with K8S cluster
package tool
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"github.com/pkg/errors"
core_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)
// K8sClient is a struct which contains the kubernetes.Interface and *rest.Config.
type K8sClient struct {
// kubernetes.Interface should be used instead of kubernets.Inteface for unit test (mocking)
ClientSet kubernetes.Interface
Config *rest.Config
}
// NewK8sClient is to generate a K8s client for interacting with the K8S cluster.
func NewK8sClient() (*K8sClient, error) {
var kubeconfig string
if kubeConfigPath := os.Getenv("KUBECONFIG"); kubeConfigPath != "" {
kubeconfig = kubeConfigPath // CI process
} else {
kubeconfig = filepath.Join(os.Getenv("HOME"), ".kube", "config") // Development environment
}
var config *rest.Config
_, err := os.Stat(kubeconfig)
if err != nil {
// In cluster configuration
config, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
} else {
// Out of cluster configuration
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
var client = &K8sClient{ClientSet: clientset, Config: config}
return client, nil
}
// Exec enables operator to execute command in the pod's container in the K8S cluster.
// It returns the standard output and the standard error output.
func (client *K8sClient) Exec(namespace, podName, containerName string, command []string, stdin io.Reader) (*bytes.Buffer, *bytes.Buffer, error) {
clientset, config := client.ClientSet, client.Config
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
scheme := runtime.NewScheme()
if err := core_v1.AddToScheme(scheme); err != nil {
return nil, nil, fmt.Errorf("error adding to scheme: %v", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&core_v1.PodExecOptions{
Command: command,
Container: containerName,
Stdin: stdin != nil,
Stdout: true,
Stderr: true,
TTY: false,
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return nil, nil, errors.Wrap(err, "Could not create SPDY executor")
}
var stdout, stderr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if err != nil {
return nil, nil, errors.Wrap(err, "Error while exec'ing stream")
}
return &stdout, &stderr, nil
}