blob: 26180e6aff92988f7323efb7a703ee5b642e2d2a [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/url"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"gopkg.in/gcfg.v1"
"github.com/vmware/govmomi/vapi/rest"
"github.com/vmware/govmomi/vapi/tags"
"github.com/vmware/govmomi/vim25/mo"
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers"
)
// VSphere Cloud Provider constants
const (
ProviderName = "vsphere"
VolDir = "kubevols"
RoundTripperDefaultCount = 3
DummyVMPrefixName = "vsphere-k8s"
MacOuiVC = "00:50:56"
MacOuiEsx = "00:0c:29"
CleanUpDummyVMRoutineInterval = 5
)
var cleanUpRoutineInitialized = false
var datastoreFolderIDMap = make(map[string]map[string]string)
var cleanUpRoutineInitLock sync.Mutex
var cleanUpDummyVMLock sync.RWMutex
// Error Messages
const (
MissingUsernameErrMsg = "Username is missing"
MissingPasswordErrMsg = "Password is missing"
)
// Error constants
var (
ErrUsernameMissing = errors.New(MissingUsernameErrMsg)
ErrPasswordMissing = errors.New(MissingPasswordErrMsg)
)
// VSphere is an implementation of cloud provider Interface for VSphere.
type VSphere struct {
cfg *VSphereConfig
hostName string
// Maps the VSphere IP address to VSphereInstance
vsphereInstanceMap map[string]*VSphereInstance
// Responsible for managing discovery of k8s node, their location etc.
nodeManager *NodeManager
vmUUID string
isSecretInfoProvided bool
}
// Represents a vSphere instance where one or more kubernetes nodes are running.
type VSphereInstance struct {
conn *vclib.VSphereConnection
cfg *VirtualCenterConfig
}
// Structure that represents Virtual Center configuration
type VirtualCenterConfig struct {
// vCenter username.
User string `gcfg:"user"`
// vCenter password in clear text.
Password string `gcfg:"password"`
// vCenter port.
VCenterPort string `gcfg:"port"`
// Datacenter in which VMs are located.
Datacenters string `gcfg:"datacenters"`
// Soap round tripper count (retries = RoundTripper - 1)
RoundTripperCount uint `gcfg:"soap-roundtrip-count"`
// Thumbprint of the VCenter's certificate thumbprint
Thumbprint string `gcfg:"thumbprint"`
}
// Structure that represents the content of vsphere.conf file.
// Users specify the configuration of one or more Virtual Centers in vsphere.conf where
// the Kubernetes master and worker nodes are running.
type VSphereConfig struct {
Global struct {
// vCenter username.
User string `gcfg:"user"`
// vCenter password in clear text.
Password string `gcfg:"password"`
// Deprecated. Use VirtualCenter to specify multiple vCenter Servers.
// vCenter IP.
VCenterIP string `gcfg:"server"`
// vCenter port.
VCenterPort string `gcfg:"port"`
// True if vCenter uses self-signed cert.
InsecureFlag bool `gcfg:"insecure-flag"`
// Specifies the path to a CA certificate in PEM format. Optional; if not
// configured, the system's CA certificates will be used.
CAFile string `gcfg:"ca-file"`
// Thumbprint of the VCenter's certificate thumbprint
Thumbprint string `gcfg:"thumbprint"`
// Datacenter in which VMs are located.
// Deprecated. Use "datacenters" instead.
Datacenter string `gcfg:"datacenter"`
// Datacenter in which VMs are located.
Datacenters string `gcfg:"datacenters"`
// Datastore in which vmdks are stored.
// Deprecated. See Workspace.DefaultDatastore
DefaultDatastore string `gcfg:"datastore"`
// WorkingDir is path where VMs can be found. Also used to create dummy VMs.
// Deprecated.
WorkingDir string `gcfg:"working-dir"`
// Soap round tripper count (retries = RoundTripper - 1)
RoundTripperCount uint `gcfg:"soap-roundtrip-count"`
// Is required on the controller-manager if it does not run on a VMware machine
// VMUUID is the VM Instance UUID of virtual machine which can be retrieved from instanceUuid
// property in VmConfigInfo, or also set as vc.uuid in VMX file.
// If not set, will be fetched from the machine via sysfs (requires root)
VMUUID string `gcfg:"vm-uuid"`
// Deprecated as virtual machine will be automatically discovered.
// VMName is the VM name of virtual machine
// Combining the WorkingDir and VMName can form a unique InstanceID.
// When vm-name is set, no username/password is required on worker nodes.
VMName string `gcfg:"vm-name"`
// Name of the secret were vCenter credentials are present.
SecretName string `gcfg:"secret-name"`
// Secret Namespace where secret will be present that has vCenter credentials.
SecretNamespace string `gcfg:"secret-namespace"`
}
VirtualCenter map[string]*VirtualCenterConfig
Network struct {
// PublicNetwork is name of the network the VMs are joined to.
PublicNetwork string `gcfg:"public-network"`
}
Disk struct {
// SCSIControllerType defines SCSI controller to be used.
SCSIControllerType string `dcfg:"scsicontrollertype"`
}
// Endpoint used to create volumes
Workspace struct {
VCenterIP string `gcfg:"server"`
Datacenter string `gcfg:"datacenter"`
Folder string `gcfg:"folder"`
DefaultDatastore string `gcfg:"default-datastore"`
ResourcePoolPath string `gcfg:"resourcepool-path"`
}
// Tag categories and tags which correspond to "built-in node labels: zones and region"
Labels struct {
Zone string `gcfg:"zone"`
Region string `gcfg:"region"`
}
}
type Volumes interface {
// AttachDisk attaches given disk to given node. Current node
// is used when nodeName is empty string.
AttachDisk(vmDiskPath string, storagePolicyName string, nodeName k8stypes.NodeName) (diskUUID string, err error)
// DetachDisk detaches given disk to given node. Current node
// is used when nodeName is empty string.
// Assumption: If node doesn't exist, disk is already detached from node.
DetachDisk(volPath string, nodeName k8stypes.NodeName) error
// DiskIsAttached checks if a disk is attached to the given node.
// Assumption: If node doesn't exist, disk is not attached to the node.
DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (bool, error)
// DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error)
// DeleteVolume deletes vmdk.
DeleteVolume(vmDiskPath string) error
}
// Parses vSphere cloud config file and stores it into VSphereConfig.
func readConfig(config io.Reader) (VSphereConfig, error) {
if config == nil {
err := fmt.Errorf("no vSphere cloud provider config file given")
return VSphereConfig{}, err
}
var cfg VSphereConfig
err := gcfg.ReadInto(&cfg, config)
return cfg, err
}
func init() {
vclib.RegisterMetrics()
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
// If vSphere.conf file is not present then it is worker node.
if config == nil {
return newWorkerNode()
}
cfg, err := readConfig(config)
if err != nil {
return nil, err
}
return newControllerNode(cfg)
})
}
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
}
// Initialize Node Informers
func (vs *VSphere) SetInformers(informerFactory informers.SharedInformerFactory) {
if vs.cfg == nil {
return
}
if vs.isSecretInfoProvided {
secretCredentialManager := &SecretCredentialManager{
SecretName: vs.cfg.Global.SecretName,
SecretNamespace: vs.cfg.Global.SecretNamespace,
SecretLister: informerFactory.Core().V1().Secrets().Lister(),
Cache: &SecretCache{
VirtualCenter: make(map[string]*Credential),
},
}
vs.nodeManager.UpdateCredentialManager(secretCredentialManager)
}
// Only on controller node it is required to register listeners.
// Register callbacks for node updates
klog.V(4).Infof("Setting up node informers for vSphere Cloud Provider")
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: vs.NodeAdded,
DeleteFunc: vs.NodeDeleted,
})
klog.V(4).Infof("Node informers in vSphere cloud provider initialized")
}
// Creates new worker node interface and returns
func newWorkerNode() (*VSphere, error) {
var err error
vs := VSphere{}
vs.hostName, err = os.Hostname()
if err != nil {
klog.Errorf("Failed to get hostname. err: %+v", err)
return nil, err
}
vs.vmUUID, err = GetVMUUID()
if err != nil {
klog.Errorf("Failed to get uuid. err: %+v", err)
return nil, err
}
return &vs, nil
}
func populateVsphereInstanceMap(cfg *VSphereConfig) (map[string]*VSphereInstance, error) {
vsphereInstanceMap := make(map[string]*VSphereInstance)
isSecretInfoProvided := true
if cfg.Global.SecretName == "" || cfg.Global.SecretNamespace == "" {
klog.Warningf("SecretName and/or SecretNamespace is not provided. " +
"VCP will use username and password from config file")
isSecretInfoProvided = false
}
if isSecretInfoProvided {
if cfg.Global.User != "" {
klog.Warning("Global.User and Secret info provided. VCP will use secret to get credentials")
cfg.Global.User = ""
}
if cfg.Global.Password != "" {
klog.Warning("Global.Password and Secret info provided. VCP will use secret to get credentials")
cfg.Global.Password = ""
}
}
// Check if the vsphere.conf is in old format. In this
// format the cfg.VirtualCenter will be nil or empty.
if cfg.VirtualCenter == nil || len(cfg.VirtualCenter) == 0 {
klog.V(4).Infof("Config is not per virtual center and is in old format.")
if !isSecretInfoProvided {
if cfg.Global.User == "" {
klog.Error("Global.User is empty!")
return nil, ErrUsernameMissing
}
if cfg.Global.Password == "" {
klog.Error("Global.Password is empty!")
return nil, ErrPasswordMissing
}
}
if cfg.Global.WorkingDir == "" {
klog.Error("Global.WorkingDir is empty!")
return nil, errors.New("Global.WorkingDir is empty!")
}
if cfg.Global.VCenterIP == "" {
klog.Error("Global.VCenterIP is empty!")
return nil, errors.New("Global.VCenterIP is empty!")
}
if cfg.Global.Datacenter == "" {
klog.Error("Global.Datacenter is empty!")
return nil, errors.New("Global.Datacenter is empty!")
}
cfg.Workspace.VCenterIP = cfg.Global.VCenterIP
cfg.Workspace.Datacenter = cfg.Global.Datacenter
cfg.Workspace.Folder = cfg.Global.WorkingDir
cfg.Workspace.DefaultDatastore = cfg.Global.DefaultDatastore
vcConfig := VirtualCenterConfig{
User: cfg.Global.User,
Password: cfg.Global.Password,
VCenterPort: cfg.Global.VCenterPort,
Datacenters: cfg.Global.Datacenter,
RoundTripperCount: cfg.Global.RoundTripperCount,
Thumbprint: cfg.Global.Thumbprint,
}
// Note: If secrets info is provided username and password will be populated
// once secret is created.
vSphereConn := vclib.VSphereConnection{
Username: vcConfig.User,
Password: vcConfig.Password,
Hostname: cfg.Global.VCenterIP,
Insecure: cfg.Global.InsecureFlag,
RoundTripperCount: vcConfig.RoundTripperCount,
Port: vcConfig.VCenterPort,
CACert: cfg.Global.CAFile,
Thumbprint: cfg.Global.Thumbprint,
}
vsphereIns := VSphereInstance{
conn: &vSphereConn,
cfg: &vcConfig,
}
vsphereInstanceMap[cfg.Global.VCenterIP] = &vsphereIns
} else {
if cfg.Workspace.VCenterIP == "" || cfg.Workspace.Folder == "" || cfg.Workspace.Datacenter == "" {
msg := fmt.Sprintf("All fields in workspace are mandatory."+
" vsphere.conf does not have the workspace specified correctly. cfg.Workspace: %+v", cfg.Workspace)
klog.Error(msg)
return nil, errors.New(msg)
}
for vcServer, vcConfig := range cfg.VirtualCenter {
klog.V(4).Infof("Initializing vc server %s", vcServer)
if vcServer == "" {
klog.Error("vsphere.conf does not have the VirtualCenter IP address specified")
return nil, errors.New("vsphere.conf does not have the VirtualCenter IP address specified")
}
if !isSecretInfoProvided {
if vcConfig.User == "" {
vcConfig.User = cfg.Global.User
if vcConfig.User == "" {
klog.Errorf("vcConfig.User is empty for vc %s!", vcServer)
return nil, ErrUsernameMissing
}
}
if vcConfig.Password == "" {
vcConfig.Password = cfg.Global.Password
if vcConfig.Password == "" {
klog.Errorf("vcConfig.Password is empty for vc %s!", vcServer)
return nil, ErrPasswordMissing
}
}
} else {
if vcConfig.User != "" {
klog.Warningf("vcConfig.User for server %s and Secret info provided. VCP will use secret to get credentials", vcServer)
vcConfig.User = ""
}
if vcConfig.Password != "" {
klog.Warningf("vcConfig.Password for server %s and Secret info provided. VCP will use secret to get credentials", vcServer)
vcConfig.Password = ""
}
}
if vcConfig.VCenterPort == "" {
vcConfig.VCenterPort = cfg.Global.VCenterPort
}
if vcConfig.Datacenters == "" {
if cfg.Global.Datacenters != "" {
vcConfig.Datacenters = cfg.Global.Datacenters
} else {
// cfg.Global.Datacenter is deprecated, so giving it the last preference.
vcConfig.Datacenters = cfg.Global.Datacenter
}
}
if vcConfig.RoundTripperCount == 0 {
vcConfig.RoundTripperCount = cfg.Global.RoundTripperCount
}
// Note: If secrets info is provided username and password will be populated
// once secret is created.
vSphereConn := vclib.VSphereConnection{
Username: vcConfig.User,
Password: vcConfig.Password,
Hostname: vcServer,
Insecure: cfg.Global.InsecureFlag,
RoundTripperCount: vcConfig.RoundTripperCount,
Port: vcConfig.VCenterPort,
CACert: cfg.Global.CAFile,
Thumbprint: vcConfig.Thumbprint,
}
vsphereIns := VSphereInstance{
conn: &vSphereConn,
cfg: vcConfig,
}
vsphereInstanceMap[vcServer] = &vsphereIns
}
}
return vsphereInstanceMap, nil
}
// getVMUUID allows tests to override GetVMUUID
var getVMUUID = GetVMUUID
// Creates new Controller node interface and returns
func newControllerNode(cfg VSphereConfig) (*VSphere, error) {
vs, err := buildVSphereFromConfig(cfg)
if err != nil {
return nil, err
}
vs.hostName, err = os.Hostname()
if err != nil {
klog.Errorf("Failed to get hostname. err: %+v", err)
return nil, err
}
if cfg.Global.VMUUID != "" {
vs.vmUUID = cfg.Global.VMUUID
} else {
vs.vmUUID, err = getVMUUID()
if err != nil {
klog.Errorf("Failed to get uuid. err: %+v", err)
return nil, err
}
}
runtime.SetFinalizer(vs, logout)
return vs, nil
}
// Initializes vSphere from vSphere CloudProvider Configuration
func buildVSphereFromConfig(cfg VSphereConfig) (*VSphere, error) {
isSecretInfoProvided := false
if cfg.Global.SecretName != "" && cfg.Global.SecretNamespace != "" {
isSecretInfoProvided = true
}
if cfg.Disk.SCSIControllerType == "" {
cfg.Disk.SCSIControllerType = vclib.PVSCSIControllerType
} else if !vclib.CheckControllerSupported(cfg.Disk.SCSIControllerType) {
klog.Errorf("%v is not a supported SCSI Controller type. Please configure 'lsilogic-sas' OR 'pvscsi'", cfg.Disk.SCSIControllerType)
return nil, errors.New("Controller type not supported. Please configure 'lsilogic-sas' OR 'pvscsi'")
}
if cfg.Global.WorkingDir != "" {
cfg.Global.WorkingDir = path.Clean(cfg.Global.WorkingDir)
}
if cfg.Global.RoundTripperCount == 0 {
cfg.Global.RoundTripperCount = RoundTripperDefaultCount
}
if cfg.Global.VCenterPort == "" {
cfg.Global.VCenterPort = "443"
}
vsphereInstanceMap, err := populateVsphereInstanceMap(&cfg)
if err != nil {
return nil, err
}
vs := VSphere{
vsphereInstanceMap: vsphereInstanceMap,
nodeManager: &NodeManager{
vsphereInstanceMap: vsphereInstanceMap,
nodeInfoMap: make(map[string]*NodeInfo),
registeredNodes: make(map[string]*v1.Node),
},
isSecretInfoProvided: isSecretInfoProvided,
cfg: &cfg,
}
return &vs, nil
}
func logout(vs *VSphere) {
for _, vsphereIns := range vs.vsphereInstanceMap {
vsphereIns.conn.Logout(context.TODO())
}
}
// Instances returns an implementation of Instances for vSphere.
func (vs *VSphere) Instances() (cloudprovider.Instances, bool) {
return vs, true
}
func getLocalIP() ([]v1.NodeAddress, error) {
addrs := []v1.NodeAddress{}
ifaces, err := net.Interfaces()
if err != nil {
klog.Errorf("net.Interfaces() failed for NodeAddresses - %v", err)
return nil, err
}
for _, i := range ifaces {
localAddrs, err := i.Addrs()
if err != nil {
klog.Warningf("Failed to extract addresses for NodeAddresses - %v", err)
} else {
for _, addr := range localAddrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
// Filter external IP by MAC address OUIs from vCenter and from ESX
var addressType v1.NodeAddressType
if strings.HasPrefix(i.HardwareAddr.String(), MacOuiVC) ||
strings.HasPrefix(i.HardwareAddr.String(), MacOuiEsx) {
v1helper.AddToNodeAddresses(&addrs,
v1.NodeAddress{
Type: v1.NodeExternalIP,
Address: ipnet.IP.String(),
},
v1.NodeAddress{
Type: v1.NodeInternalIP,
Address: ipnet.IP.String(),
},
)
}
klog.V(4).Infof("Find local IP address %v and set type to %v", ipnet.IP.String(), addressType)
}
}
}
}
}
return addrs, nil
}
func (vs *VSphere) getVSphereInstance(nodeName k8stypes.NodeName) (*VSphereInstance, error) {
vsphereIns, err := vs.nodeManager.GetVSphereInstance(nodeName)
if err != nil {
klog.Errorf("Cannot find node %q in cache. Node not found!!!", nodeName)
return nil, err
}
return &vsphereIns, nil
}
func (vs *VSphere) getVSphereInstanceForServer(vcServer string, ctx context.Context) (*VSphereInstance, error) {
vsphereIns, ok := vs.vsphereInstanceMap[vcServer]
if !ok {
klog.Errorf("cannot find vcServer %q in cache. VC not found!!!", vcServer)
return nil, errors.New(fmt.Sprintf("Cannot find node %q in vsphere configuration map", vcServer))
}
// Ensure client is logged in and session is valid
err := vs.nodeManager.vcConnect(ctx, vsphereIns)
if err != nil {
klog.Errorf("failed connecting to vcServer %q with error %+v", vcServer, err)
return nil, err
}
return vsphereIns, nil
}
// Get the VM Managed Object instance by from the node
func (vs *VSphere) getVMFromNodeName(ctx context.Context, nodeName k8stypes.NodeName) (*vclib.VirtualMachine, error) {
nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName)
if err != nil {
return nil, err
}
return nodeInfo.vm, nil
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (vs *VSphere) NodeAddresses(ctx context.Context, nodeName k8stypes.NodeName) ([]v1.NodeAddress, error) {
// Get local IP addresses if node is local node
if vs.hostName == convertToString(nodeName) {
addrs, err := getLocalIP()
if err != nil {
return nil, err
}
// add the hostname address
v1helper.AddToNodeAddresses(&addrs, v1.NodeAddress{Type: v1.NodeHostName, Address: vs.hostName})
return addrs, nil
}
if vs.cfg == nil {
return nil, cloudprovider.InstanceNotFound
}
// Below logic can be executed only on master as VC details are present.
addrs := []v1.NodeAddress{}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstance(nodeName)
if err != nil {
return nil, err
}
// Ensure client is logged in and session is valid
err = vs.nodeManager.vcConnect(ctx, vsi)
if err != nil {
return nil, err
}
vm, err := vs.getVMFromNodeName(ctx, nodeName)
if err != nil {
klog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err)
return nil, err
}
vmMoList, err := vm.Datacenter.GetVMMoList(ctx, []*vclib.VirtualMachine{vm}, []string{"guest.net"})
if err != nil {
klog.Errorf("Failed to get VM Managed object with property guest.net for node: %q. err: +%v", convertToString(nodeName), err)
return nil, err
}
// retrieve VM's ip(s)
for _, v := range vmMoList[0].Guest.Net {
if vs.cfg.Network.PublicNetwork == v.Network {
for _, ip := range v.IpAddress {
if net.ParseIP(ip).To4() != nil {
v1helper.AddToNodeAddresses(&addrs,
v1.NodeAddress{
Type: v1.NodeExternalIP,
Address: ip,
}, v1.NodeAddress{
Type: v1.NodeInternalIP,
Address: ip,
},
)
}
}
}
}
return addrs, nil
}
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (vs *VSphere) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return vs.NodeAddresses(ctx, convertToK8sType(providerID))
}
// AddSSHKeyToAllInstances add SSH key to all instances
func (vs *VSphere) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
// CurrentNodeName gives the current node name
func (vs *VSphere) CurrentNodeName(ctx context.Context, hostname string) (k8stypes.NodeName, error) {
return convertToK8sType(vs.hostName), nil
}
func convertToString(nodeName k8stypes.NodeName) string {
return string(nodeName)
}
func convertToK8sType(vmName string) k8stypes.NodeName {
return k8stypes.NodeName(vmName)
}
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (vs *VSphere) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
nodeName, err := vs.GetNodeNameFromProviderID(providerID)
if err != nil {
klog.Errorf("Error while getting nodename for providerID %s", providerID)
return false, err
}
_, err = vs.InstanceID(ctx, convertToK8sType(nodeName))
if err == nil {
return true, nil
}
return false, err
}
// InstanceShutdownByProviderID returns true if the instance is in safe state to detach volumes
func (vs *VSphere) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
nodeName, err := vs.GetNodeNameFromProviderID(providerID)
if err != nil {
klog.Errorf("Error while getting nodename for providerID %s", providerID)
return false, err
}
vsi, err := vs.getVSphereInstance(convertToK8sType(nodeName))
if err != nil {
return false, err
}
// Ensure client is logged in and session is valid
if err := vs.nodeManager.vcConnect(ctx, vsi); err != nil {
return false, err
}
vm, err := vs.getVMFromNodeName(ctx, convertToK8sType(nodeName))
if err != nil {
klog.Errorf("Failed to get VM object for node: %q. err: +%v", nodeName, err)
return false, err
}
isActive, err := vm.IsActive(ctx)
if err != nil {
klog.Errorf("Failed to check whether node %q is active. err: %+v.", nodeName, err)
return false, err
}
return !isActive, nil
}
// InstanceID returns the cloud provider ID of the node with the specified Name.
func (vs *VSphere) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
instanceIDInternal := func() (string, error) {
if vs.hostName == convertToString(nodeName) {
return vs.vmUUID, nil
}
// Below logic can be performed only on master node where VC details are preset.
if vs.cfg == nil {
return "", fmt.Errorf("The current node can't detremine InstanceID for %q", convertToString(nodeName))
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstance(nodeName)
if err != nil {
return "", err
}
// Ensure client is logged in and session is valid
err = vs.nodeManager.vcConnect(ctx, vsi)
if err != nil {
return "", err
}
vm, err := vs.getVMFromNodeName(ctx, nodeName)
if err != nil {
klog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err)
return "", err
}
isActive, err := vm.IsActive(ctx)
if err != nil {
klog.Errorf("Failed to check whether node %q is active. err: %+v.", convertToString(nodeName), err)
return "", err
}
if isActive {
return vs.vmUUID, nil
}
klog.Warningf("The VM: %s is not in %s state", convertToString(nodeName), vclib.ActivePowerState)
return "", cloudprovider.InstanceNotFound
}
instanceID, err := instanceIDInternal()
if err != nil {
if vclib.IsManagedObjectNotFoundError(err) {
err = vs.nodeManager.RediscoverNode(nodeName)
if err == nil {
klog.V(4).Infof("InstanceID: Found node %q", convertToString(nodeName))
instanceID, err = instanceIDInternal()
} else if err == vclib.ErrNoVMFound {
return "", cloudprovider.InstanceNotFound
}
}
}
return instanceID, err
}
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (vs *VSphere) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", nil
}
func (vs *VSphere) InstanceType(ctx context.Context, name k8stypes.NodeName) (string, error) {
return "", nil
}
func (vs *VSphere) Clusters() (cloudprovider.Clusters, bool) {
return nil, true
}
// ProviderName returns the cloud provider ID.
func (vs *VSphere) ProviderName() string {
return ProviderName
}
// LoadBalancer returns an implementation of LoadBalancer for vSphere.
func (vs *VSphere) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
}
// Zones returns an implementation of Zones for vSphere.
func (vs *VSphere) Zones() (cloudprovider.Zones, bool) {
if vs.cfg == nil {
klog.V(1).Info("The vSphere cloud provider does not support zones")
return nil, false
}
return vs, true
}
// Routes returns a false since the interface is not supported for vSphere.
func (vs *VSphere) Routes() (cloudprovider.Routes, bool) {
return nil, false
}
// AttachDisk attaches given virtual disk volume to the compute running kubelet.
func (vs *VSphere) AttachDisk(vmDiskPath string, storagePolicyName string, nodeName k8stypes.NodeName) (diskUUID string, err error) {
attachDiskInternal := func(vmDiskPath string, storagePolicyName string, nodeName k8stypes.NodeName) (diskUUID string, err error) {
if nodeName == "" {
nodeName = convertToK8sType(vs.hostName)
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstance(nodeName)
if err != nil {
return "", err
}
// Ensure client is logged in and session is valid
err = vs.nodeManager.vcConnect(ctx, vsi)
if err != nil {
return "", err
}
vm, err := vs.getVMFromNodeName(ctx, nodeName)
if err != nil {
klog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err)
return "", err
}
diskUUID, err = vm.AttachDisk(ctx, vmDiskPath, &vclib.VolumeOptions{SCSIControllerType: vclib.PVSCSIControllerType, StoragePolicyName: storagePolicyName})
if err != nil {
klog.Errorf("Failed to attach disk: %s for node: %s. err: +%v", vmDiskPath, convertToString(nodeName), err)
return "", err
}
return diskUUID, nil
}
requestTime := time.Now()
diskUUID, err = attachDiskInternal(vmDiskPath, storagePolicyName, nodeName)
if err != nil {
if vclib.IsManagedObjectNotFoundError(err) {
err = vs.nodeManager.RediscoverNode(nodeName)
if err == nil {
klog.V(4).Infof("AttachDisk: Found node %q", convertToString(nodeName))
diskUUID, err = attachDiskInternal(vmDiskPath, storagePolicyName, nodeName)
klog.V(4).Infof("AttachDisk: Retry: diskUUID %s, err +%v", diskUUID, err)
}
}
}
klog.V(4).Infof("AttachDisk executed for node %s and volume %s with diskUUID %s. Err: %s", convertToString(nodeName), vmDiskPath, diskUUID, err)
vclib.RecordvSphereMetric(vclib.OperationAttachVolume, requestTime, err)
return diskUUID, err
}
// DetachDisk detaches given virtual disk volume from the compute running kubelet.
func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error {
detachDiskInternal := func(volPath string, nodeName k8stypes.NodeName) error {
if nodeName == "" {
nodeName = convertToK8sType(vs.hostName)
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstance(nodeName)
if err != nil {
// If node doesn't exist, disk is already detached from node.
if err == vclib.ErrNoVMFound {
klog.Infof("Node %q does not exist, disk %s is already detached from node.", convertToString(nodeName), volPath)
return nil
}
return err
}
// Ensure client is logged in and session is valid
err = vs.nodeManager.vcConnect(ctx, vsi)
if err != nil {
return err
}
vm, err := vs.getVMFromNodeName(ctx, nodeName)
if err != nil {
// If node doesn't exist, disk is already detached from node.
if err == vclib.ErrNoVMFound {
klog.Infof("Node %q does not exist, disk %s is already detached from node.", convertToString(nodeName), volPath)
return nil
}
klog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err)
return err
}
err = vm.DetachDisk(ctx, volPath)
if err != nil {
klog.Errorf("Failed to detach disk: %s for node: %s. err: +%v", volPath, convertToString(nodeName), err)
return err
}
return nil
}
requestTime := time.Now()
err := detachDiskInternal(volPath, nodeName)
if err != nil {
if vclib.IsManagedObjectNotFoundError(err) {
err = vs.nodeManager.RediscoverNode(nodeName)
if err == nil {
err = detachDiskInternal(volPath, nodeName)
}
}
}
vclib.RecordvSphereMetric(vclib.OperationDetachVolume, requestTime, err)
return err
}
// DiskIsAttached returns if disk is attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (bool, error) {
diskIsAttachedInternal := func(volPath string, nodeName k8stypes.NodeName) (bool, error) {
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.hostName
nodeName = convertToK8sType(vSphereInstance)
} else {
vSphereInstance = convertToString(nodeName)
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstance(nodeName)
if err != nil {
return false, err
}
// Ensure client is logged in and session is valid
err = vs.nodeManager.vcConnect(ctx, vsi)
if err != nil {
return false, err
}
vm, err := vs.getVMFromNodeName(ctx, nodeName)
if err != nil {
if err == vclib.ErrNoVMFound {
klog.Warningf("Node %q does not exist, vsphere CP will assume disk %v is not attached to it.", nodeName, volPath)
// make the disk as detached and return false without error.
return false, nil
}
klog.Errorf("Failed to get VM object for node: %q. err: +%v", vSphereInstance, err)
return false, err
}
volPath = vclib.RemoveStorageClusterORFolderNameFromVDiskPath(volPath)
attached, err := vm.IsDiskAttached(ctx, volPath)
if err != nil {
klog.Errorf("DiskIsAttached failed to determine whether disk %q is still attached on node %q",
volPath,
vSphereInstance)
}
klog.V(4).Infof("DiskIsAttached result: %v and error: %q, for volume: %q", attached, err, volPath)
return attached, err
}
requestTime := time.Now()
isAttached, err := diskIsAttachedInternal(volPath, nodeName)
if err != nil {
if vclib.IsManagedObjectNotFoundError(err) {
err = vs.nodeManager.RediscoverNode(nodeName)
if err == vclib.ErrNoVMFound {
isAttached, err = false, nil
} else if err == nil {
isAttached, err = diskIsAttachedInternal(volPath, nodeName)
}
}
}
vclib.RecordvSphereMetric(vclib.OperationDiskIsAttached, requestTime, err)
return isAttached, err
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
// 1. Converts volPaths into canonical form so that it can be compared with the VM device path.
// 2. Segregates nodes by vCenter and Datacenter they are present in. This reduces calls to VC.
// 3. Creates go routines per VC-DC to find whether disks are attached to the nodes.
// 4. If the some of the VMs are not found or migrated then they are added to a list.
// 5. After successful execution of goroutines,
// 5a. If there are any VMs which needs to be retried, they are rediscovered and the whole operation is initiated again for only rediscovered VMs.
// 5b. If VMs are removed from vSphere inventory they are ignored.
func (vs *VSphere) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
disksAreAttachedInternal := func(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
// disksAreAttach checks whether disks are attached to the nodes.
// Returns nodes that need to be retried if retry is true
// Segregates nodes per VC and DC
// Creates go routines per VC-DC to find whether disks are attached to the nodes.
disksAreAttach := func(ctx context.Context, nodeVolumes map[k8stypes.NodeName][]string, attached map[string]map[string]bool, retry bool) ([]k8stypes.NodeName, error) {
var wg sync.WaitGroup
var localAttachedMaps []map[string]map[string]bool
var nodesToRetry []k8stypes.NodeName
var globalErr error
globalErr = nil
globalErrMutex := &sync.Mutex{}
nodesToRetryMutex := &sync.Mutex{}
// Segregate nodes according to VC-DC
dcNodes := make(map[string][]k8stypes.NodeName)
for nodeName := range nodeVolumes {
nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName)
if err != nil {
klog.Errorf("Failed to get node info: %+v. err: %+v", nodeInfo.vm, err)
return nodesToRetry, err
}
VC_DC := nodeInfo.vcServer + nodeInfo.dataCenter.String()
dcNodes[VC_DC] = append(dcNodes[VC_DC], nodeName)
}
for _, nodes := range dcNodes {
localAttachedMap := make(map[string]map[string]bool)
localAttachedMaps = append(localAttachedMaps, localAttachedMap)
// Start go routines per VC-DC to check disks are attached
go func() {
nodesToRetryLocal, err := vs.checkDiskAttached(ctx, nodes, nodeVolumes, localAttachedMap, retry)
if err != nil {
if !vclib.IsManagedObjectNotFoundError(err) {
globalErrMutex.Lock()
globalErr = err
globalErrMutex.Unlock()
klog.Errorf("Failed to check disk attached for nodes: %+v. err: %+v", nodes, err)
}
}
nodesToRetryMutex.Lock()
nodesToRetry = append(nodesToRetry, nodesToRetryLocal...)
nodesToRetryMutex.Unlock()
wg.Done()
}()
wg.Add(1)
}
wg.Wait()
if globalErr != nil {
return nodesToRetry, globalErr
}
for _, localAttachedMap := range localAttachedMaps {
for key, value := range localAttachedMap {
attached[key] = value
}
}
return nodesToRetry, nil
}
klog.V(4).Infof("Starting DisksAreAttached API for vSphere with nodeVolumes: %+v", nodeVolumes)
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
disksAttached := make(map[k8stypes.NodeName]map[string]bool)
if len(nodeVolumes) == 0 {
return disksAttached, nil
}
// Convert VolPaths into canonical form so that it can be compared with the VM device path.
vmVolumes, err := vs.convertVolPathsToDevicePaths(ctx, nodeVolumes)
if err != nil {
klog.Errorf("Failed to convert volPaths to devicePaths: %+v. err: %+v", nodeVolumes, err)
return nil, err
}
attached := make(map[string]map[string]bool)
nodesToRetry, err := disksAreAttach(ctx, vmVolumes, attached, false)
if err != nil {
return nil, err
}
if len(nodesToRetry) != 0 {
// Rediscover nodes which are need to be retried
remainingNodesVolumes := make(map[k8stypes.NodeName][]string)
for _, nodeName := range nodesToRetry {
err = vs.nodeManager.RediscoverNode(nodeName)
if err != nil {
if err == vclib.ErrNoVMFound {
klog.V(4).Infof("node %s not found. err: %+v", nodeName, err)
continue
}
klog.Errorf("Failed to rediscover node %s. err: %+v", nodeName, err)
return nil, err
}
remainingNodesVolumes[nodeName] = nodeVolumes[nodeName]
}
// If some remaining nodes are still registered
if len(remainingNodesVolumes) != 0 {
nodesToRetry, err = disksAreAttach(ctx, remainingNodesVolumes, attached, true)
if err != nil || len(nodesToRetry) != 0 {
klog.Errorf("Failed to retry disksAreAttach for nodes %+v. err: %+v", remainingNodesVolumes, err)
return nil, err
}
}
for nodeName, volPaths := range attached {
disksAttached[convertToK8sType(nodeName)] = volPaths
}
}
klog.V(4).Infof("DisksAreAttach successfully executed. result: %+v", attached)
return disksAttached, nil
}
requestTime := time.Now()
attached, err := disksAreAttachedInternal(nodeVolumes)
vclib.RecordvSphereMetric(vclib.OperationDisksAreAttached, requestTime, err)
return attached, err
}
// CreateVolume creates a volume of given size (in KiB) and return the volume path.
// If the volumeOptions.Datastore is part of datastore cluster for example - [DatastoreCluster/sharedVmfs-0] then
// return value will be [DatastoreCluster/sharedVmfs-0] kubevols/<volume-name>.vmdk
// else return value will be [sharedVmfs-0] kubevols/<volume-name>.vmdk
func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
klog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions)
createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
var datastore string
// If datastore not specified, then use default datastore
if volumeOptions.Datastore == "" {
datastore = vs.cfg.Workspace.DefaultDatastore
} else {
datastore = volumeOptions.Datastore
}
datastore = strings.TrimSpace(datastore)
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
if err != nil {
return "", err
}
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
if err != nil {
return "", err
}
var vmOptions *vclib.VMOptions
if volumeOptions.VSANStorageProfileData != "" || volumeOptions.StoragePolicyName != "" {
// Acquire a read lock to ensure multiple PVC requests can be processed simultaneously.
cleanUpDummyVMLock.RLock()
defer cleanUpDummyVMLock.RUnlock()
// Create a new background routine that will delete any dummy VM's that are left stale.
// This routine will get executed for every 5 minutes and gets initiated only once in its entire lifetime.
cleanUpRoutineInitLock.Lock()
if !cleanUpRoutineInitialized {
klog.V(1).Infof("Starting a clean up routine to remove stale dummy VM's")
go vs.cleanUpDummyVMs(DummyVMPrefixName)
cleanUpRoutineInitialized = true
}
cleanUpRoutineInitLock.Unlock()
vmOptions, err = vs.setVMOptions(ctx, dc, vs.cfg.Workspace.ResourcePoolPath)
if err != nil {
klog.Errorf("Failed to set VM options requires to create a vsphere volume. err: %+v", err)
return "", err
}
}
if volumeOptions.StoragePolicyName != "" && volumeOptions.Datastore == "" {
datastore, err = getPbmCompatibleDatastore(ctx, dc, volumeOptions.StoragePolicyName, vs.nodeManager)
if err != nil {
klog.Errorf("Failed to get pbm compatible datastore with storagePolicy: %s. err: %+v", volumeOptions.StoragePolicyName, err)
return "", err
}
} else {
// Since no storage policy is specified but datastore is specified, check
// if the given datastore is a shared datastore across all node VMs.
sharedDsList, err := getSharedDatastoresInK8SCluster(ctx, dc, vs.nodeManager)
if err != nil {
klog.Errorf("Failed to get shared datastore: %+v", err)
return "", err
}
found := false
for _, sharedDs := range sharedDsList {
if datastore == sharedDs.Info.Name {
found = true
break
}
}
if !found {
msg := fmt.Sprintf("The specified datastore %s is not a shared datastore across node VMs", datastore)
return "", errors.New(msg)
}
}
ds, err := dc.GetDatastoreByName(ctx, datastore)
if err != nil {
return "", err
}
volumeOptions.Datastore = datastore
kubeVolsPath := filepath.Clean(ds.Path(VolDir)) + "/"
err = ds.CreateDirectory(ctx, kubeVolsPath, false)
if err != nil && err != vclib.ErrFileAlreadyExist {
klog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
return "", err
}
volumePath := kubeVolsPath + volumeOptions.Name + ".vmdk"
disk := diskmanagers.VirtualDisk{
DiskPath: volumePath,
VolumeOptions: volumeOptions,
VMOptions: vmOptions,
}
volumePath, err = disk.Create(ctx, ds)
if err != nil {
klog.Errorf("Failed to create a vsphere volume with volumeOptions: %+v on datastore: %s. err: %+v", volumeOptions, datastore, err)
return "", err
}
// Get the canonical path for the volume path.
canonicalVolumePath, err = getcanonicalVolumePath(ctx, dc, volumePath)
if err != nil {
klog.Errorf("Failed to get canonical vsphere volume path for volume: %s with volumeOptions: %+v on datastore: %s. err: %+v", volumePath, volumeOptions, datastore, err)
return "", err
}
if filepath.Base(datastore) != datastore {
// If datastore is within cluster, add cluster path to the volumePath
canonicalVolumePath = strings.Replace(canonicalVolumePath, filepath.Base(datastore), datastore, 1)
}
return canonicalVolumePath, nil
}
requestTime := time.Now()
canonicalVolumePath, err = createVolumeInternal(volumeOptions)
vclib.RecordCreateVolumeMetric(volumeOptions, requestTime, err)
klog.V(4).Infof("The canonical volume path for the newly created vSphere volume is %q", canonicalVolumePath)
return canonicalVolumePath, err
}
// DeleteVolume deletes a volume given volume name.
func (vs *VSphere) DeleteVolume(vmDiskPath string) error {
klog.V(1).Infof("Starting to delete vSphere volume with vmDiskPath: %s", vmDiskPath)
deleteVolumeInternal := func(vmDiskPath string) error {
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
if err != nil {
return err
}
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
if err != nil {
return err
}
disk := diskmanagers.VirtualDisk{
DiskPath: vmDiskPath,
VolumeOptions: &vclib.VolumeOptions{},
VMOptions: &vclib.VMOptions{},
}
err = disk.Delete(ctx, dc)
if err != nil {
klog.Errorf("Failed to delete vsphere volume with vmDiskPath: %s. err: %+v", vmDiskPath, err)
}
return err
}
requestTime := time.Now()
err := deleteVolumeInternal(vmDiskPath)
vclib.RecordvSphereMetric(vclib.OperationDeleteVolume, requestTime, err)
return err
}
// HasClusterID returns true if the cluster has a clusterID
func (vs *VSphere) HasClusterID() bool {
return true
}
// Notification handler when node is added into k8s cluster.
func (vs *VSphere) NodeAdded(obj interface{}) {
node, ok := obj.(*v1.Node)
if node == nil || !ok {
klog.Warningf("NodeAdded: unrecognized object %+v", obj)
return
}
klog.V(4).Infof("Node added: %+v", node)
vs.nodeManager.RegisterNode(node)
}
// Notification handler when node is removed from k8s cluster.
func (vs *VSphere) NodeDeleted(obj interface{}) {
node, ok := obj.(*v1.Node)
if node == nil || !ok {
klog.Warningf("NodeDeleted: unrecognized object %+v", obj)
return
}
klog.V(4).Infof("Node deleted: %+v", node)
vs.nodeManager.UnRegisterNode(node)
}
func (vs *VSphere) NodeManager() (nodeManager *NodeManager) {
if vs == nil {
return nil
}
return vs.nodeManager
}
func withTagsClient(ctx context.Context, connection *vclib.VSphereConnection, f func(c *rest.Client) error) error {
c := rest.NewClient(connection.Client)
user := url.UserPassword(connection.Username, connection.Password)
if err := c.Login(ctx, user); err != nil {
return err
}
defer c.Logout(ctx)
return f(c)
}
// GetZone implements Zones.GetZone
func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
nodeName, err := vs.CurrentNodeName(ctx, vs.hostName)
if err != nil {
klog.Errorf("Cannot get node name.")
return cloudprovider.Zone{}, err
}
zone := cloudprovider.Zone{}
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
if err != nil {
klog.Errorf("Cannot connent to vsphere. Get zone for node %s error", nodeName)
return cloudprovider.Zone{}, err
}
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
if err != nil {
klog.Errorf("Cannot connent to datacenter. Get zone for node %s error", nodeName)
return cloudprovider.Zone{}, err
}
vmHost, err := dc.GetHostByVMUUID(ctx, vs.vmUUID)
if err != nil {
klog.Errorf("Cannot find VM runtime host. Get zone for node %s error", nodeName)
return cloudprovider.Zone{}, err
}
pc := vsi.conn.Client.ServiceContent.PropertyCollector
err = withTagsClient(ctx, vsi.conn, func(c *rest.Client) error {
client := tags.NewManager(c)
// example result: ["Folder", "Datacenter", "Cluster", "Host"]
objects, err := mo.Ancestors(ctx, vsi.conn.Client, pc, *vmHost)
if err != nil {
return err
}
// search the hierarchy, example order: ["Host", "Cluster", "Datacenter", "Folder"]
for i := range objects {
obj := objects[len(objects)-1-i]
tags, err := client.ListAttachedTags(ctx, obj)
if err != nil {
klog.Errorf("Cannot list attached tags. Get zone for node %s: %s", nodeName, err)
return err
}
for _, value := range tags {
tag, err := client.GetTag(ctx, value)
if err != nil {
klog.Errorf("Get tag %s: %s", value, err)
return err
}
category, err := client.GetCategory(ctx, tag.CategoryID)
if err != nil {
klog.Errorf("Get category %s error", value)
return err
}
found := func() {
klog.Errorf("Found %q tag (%s) for %s attached to %s", category.Name, tag.Name, vs.vmUUID, obj.Reference())
}
switch {
case category.Name == vs.cfg.Labels.Zone:
zone.FailureDomain = tag.Name
found()
case category.Name == vs.cfg.Labels.Region:
zone.Region = tag.Name
found()
}
if zone.FailureDomain != "" && zone.Region != "" {
return nil
}
}
}
if zone.Region == "" {
if vs.cfg.Labels.Region != "" {
return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vs.vmUUID)
}
}
if zone.FailureDomain == "" {
if vs.cfg.Labels.Zone != "" {
return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vs.vmUUID)
}
}
return nil
})
if err != nil {
klog.Errorf("Get zone for node %s: %s", nodeName, err)
return cloudprovider.Zone{}, err
}
return zone, nil
}
func (vs *VSphere) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.NodeName) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, cloudprovider.NotImplemented
}
func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, cloudprovider.NotImplemented
}