blob: 6a536cda1d2bb2a5c75cbb47f2dd14dc2d264740 [file] [log] [blame]
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"k8s.io/klog"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
ioutil "k8s.io/kubernetes/pkg/volume/util"
)
type csiBlockMapper struct {
k8s kubernetes.Interface
csiClient csiClient
plugin *csiPlugin
driverName string
specName string
volumeID string
readOnly bool
spec *volume.Spec
podUID types.UID
volumeInfo map[string]string
}
var _ volume.BlockVolumeMapper = &csiBlockMapper{}
// GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to
// Example: plugins/kubernetes.io/csi/volumeDevices/{pvname}/dev
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
return dir, nil
}
// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
func (m *csiBlockMapper) getStagingPath() string {
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(m.specName)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "staging", sanitizedSpecVolID)
}
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
func (m *csiBlockMapper) getPublishPath() string {
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(m.specName)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "publish", sanitizedSpecVolID)
}
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, kstrings.EscapeQualifiedNameForDisk(csiPluginName))
specName := m.specName
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
return path, specName
}
// stageVolumeForBlock stages a block volume to stagingPath
func (m *csiBlockMapper) stageVolumeForBlock(
ctx context.Context,
csi csiClient,
accessMode v1.PersistentVolumeAccessMode,
csiSource *v1.CSIPersistentVolumeSource,
attachment *storage.VolumeAttachment,
) (string, error) {
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
stagingPath := m.getStagingPath()
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
if err != nil {
klog.Error(log("blockMapper.stageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
return "", err
}
if !stageUnstageSet {
klog.Infof(log("blockMapper.stageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
return "", nil
}
publishVolumeInfo := attachment.Status.AttachmentMetadata
nodeStageSecrets := map[string]string{}
if csiSource.NodeStageSecretRef != nil {
nodeStageSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodeStageSecretRef)
if err != nil {
return "", fmt.Errorf("failed to get NodeStageSecretRef %s/%s: %v",
csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
}
}
// Creating a stagingPath directory before call to NodeStageVolume
if err := os.MkdirAll(stagingPath, 0750); err != nil {
klog.Error(log("blockMapper.stageVolumeForBlock failed to create dir %s: %v", stagingPath, err))
return "", err
}
klog.V(4).Info(log("blockMapper.stageVolumeForBlock created stagingPath directory successfully [%s]", stagingPath))
// Request to stage a block volume to stagingPath.
// Expected implementation for driver is creating driver specific resource on stagingPath and
// attaching the block volume to the node.
err = csi.NodeStageVolume(ctx,
csiSource.VolumeHandle,
publishVolumeInfo,
stagingPath,
fsTypeBlockName,
accessMode,
nodeStageSecrets,
csiSource.VolumeAttributes)
if err != nil {
klog.Error(log("blockMapper.stageVolumeForBlock failed: %v", err))
return "", err
}
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath))
return stagingPath, nil
}
// publishVolumeForBlock publishes a block volume to publishPath
func (m *csiBlockMapper) publishVolumeForBlock(
ctx context.Context,
csi csiClient,
accessMode v1.PersistentVolumeAccessMode,
csiSource *v1.CSIPersistentVolumeSource,
attachment *storage.VolumeAttachment,
stagingPath string,
) (string, error) {
klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
publishVolumeInfo := attachment.Status.AttachmentMetadata
nodePublishSecrets := map[string]string{}
var err error
if csiSource.NodePublishSecretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
if err != nil {
klog.Errorf("blockMapper.publishVolumeForBlock failed to get NodePublishSecretRef %s/%s: %v",
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
return "", err
}
}
publishPath := m.getPublishPath()
// Setup a parent directory for publishPath before call to NodePublishVolume
publishDir := filepath.Dir(publishPath)
if err := os.MkdirAll(publishDir, 0750); err != nil {
klog.Error(log("blockMapper.publishVolumeForBlock failed to create dir %s: %v", publishDir, err))
return "", err
}
klog.V(4).Info(log("blockMapper.publishVolumeForBlock created directory for publishPath successfully [%s]", publishDir))
// Request to publish a block volume to publishPath.
// Expectation for driver is to place a block volume on the publishPath, by bind-mounting the device file on the publishPath or
// creating device file on the publishPath.
// Parent directory for publishPath is created by k8s, but driver is responsible for creating publishPath itself.
// If driver doesn't implement NodeStageVolume, attaching the block volume to the node may be done, here.
err = csi.NodePublishVolume(
ctx,
m.volumeID,
m.readOnly,
stagingPath,
publishPath,
accessMode,
publishVolumeInfo,
csiSource.VolumeAttributes,
nodePublishSecrets,
fsTypeBlockName,
[]string{},
)
if err != nil {
klog.Errorf(log("blockMapper.publishVolumeForBlock failed: %v", err))
return "", err
}
return publishPath, nil
}
// SetUpDevice ensures the device is attached returns path where the device is located.
func (m *csiBlockMapper) SetUpDevice() (string, error) {
if !m.plugin.blockEnabled {
return "", errors.New("CSIBlockVolume feature not enabled")
}
klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
// Get csiSource from spec
if m.spec == nil {
klog.Error(log("blockMapper.SetUpDevice spec is nil"))
return "", fmt.Errorf("spec is nil")
}
csiSource, err := getCSISourceFromSpec(m.spec)
if err != nil {
klog.Error(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
return "", err
}
// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
nodeName := string(m.plugin.host.GetNodeName())
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
klog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
return "", err
}
if attachment == nil {
klog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID))
return "", errors.New("no existing VolumeAttachment found")
}
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
if m.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Call NodeStageVolume
stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment)
if err != nil {
return "", err
}
// Call NodePublishVolume
publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath)
if err != nil {
return "", err
}
return publishPath, nil
}
func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error {
return ioutil.MapBlockVolume(devicePath, globalMapPath, volumeMapPath, volumeMapName, podUID)
}
var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
// unpublishVolumeForBlock unpublishes a block volume from publishPath
func (m *csiBlockMapper) unpublishVolumeForBlock(ctx context.Context, csi csiClient, publishPath string) error {
// Request to unpublish a block volume from publishPath.
// Expectation for driver is to remove block volume from the publishPath, by unmounting bind-mounted device file
// or deleting device file.
// Driver is responsible for deleting publishPath itself.
// If driver doesn't implement NodeUnstageVolume, detaching the block volume from the node may be done, here.
if err := csi.NodeUnpublishVolume(ctx, m.volumeID, publishPath); err != nil {
klog.Error(log("blockMapper.unpublishVolumeForBlock failed: %v", err))
return err
}
klog.V(4).Infof(log("blockMapper.unpublishVolumeForBlock NodeUnpublished successfully [%s]", publishPath))
return nil
}
// unstageVolumeForBlock unstages a block volume from stagingPath
func (m *csiBlockMapper) unstageVolumeForBlock(ctx context.Context, csi csiClient, stagingPath string) error {
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
if err != nil {
klog.Error(log("blockMapper.unstageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
return err
}
if !stageUnstageSet {
klog.Infof(log("blockMapper.unstageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping unstageVolumeForBlock ..."))
return nil
}
// Request to unstage a block volume from stagingPath.
// Expected implementation for driver is removing driver specific resource in stagingPath and
// detaching the block volume from the node.
if err := csi.NodeUnstageVolume(ctx, m.volumeID, stagingPath); err != nil {
klog.Errorf(log("blockMapper.unstageVolumeForBlock failed: %v", err))
return err
}
klog.V(4).Infof(log("blockMapper.unstageVolumeForBlock NodeUnstageVolume successfully [%s]", stagingPath))
// Remove stagingPath directory and its contents
if err := os.RemoveAll(stagingPath); err != nil {
klog.Error(log("blockMapper.unstageVolumeForBlock failed to remove staging path after NodeUnstageVolume() error [%s]: %v", stagingPath, err))
return err
}
return nil
}
// TearDownDevice removes traces of the SetUpDevice.
func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
if !m.plugin.blockEnabled {
return errors.New("CSIBlockVolume feature not enabled")
}
klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Call NodeUnpublishVolume
publishPath := m.getPublishPath()
if _, err := os.Stat(publishPath); err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
} else {
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath)
if err != nil {
return err
}
}
// Call NodeUnstageVolume
stagingPath := m.getStagingPath()
if _, err := os.Stat(stagingPath); err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
} else {
return err
}
} else {
err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath)
if err != nil {
return err
}
}
return nil
}