[YUNIKORN-1041] Fix binding of dynamic volumes to pod (#359)

Closes: #359
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ee3ae2e..e857128 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -31,6 +31,7 @@
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/tools/cache"
 	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
+	"k8s.io/kubernetes/pkg/controller/volume/scheduling"
 
 	"github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"
 	schedulercache "github.com/apache/incubator-yunikorn-k8shim/pkg/cache/external"
@@ -356,19 +357,68 @@
 				zap.String("podName", pod.Name))
 		} else {
 			log.Logger().Info("Binding Pod Volumes", zap.String("podName", pod.Name))
-			boundClaims, claimsToBind, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+			boundClaims, claimsToBind, unboundClaimsImmediate, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
 			if err != nil {
+				log.Logger().Error("Failed to get pod volumes",
+					zap.String("podName", assumedPod.Name),
+					zap.Error(err))
+				return err
+			}
+			if len(unboundClaimsImmediate) > 0 {
+				err = fmt.Errorf("pod %s has unbound immediate claims", pod.Name)
+				log.Logger().Error("Pod has unbound immediate claims",
+					zap.String("podName", assumedPod.Name),
+					zap.Error(err))
 				return err
 			}
 			node, err := ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
 			if err != nil {
+				log.Logger().Error("Failed to get node info",
+					zap.String("podName", assumedPod.Name),
+					zap.String("nodeName", assumedPod.Spec.NodeName),
+					zap.Error(err))
 				return err
 			}
-			volumes, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node)
+			volumes, reasons, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node)
 			if err != nil {
+				log.Logger().Error("Failed to find pod volumes",
+					zap.String("podName", assumedPod.Name),
+					zap.String("nodeName", assumedPod.Spec.NodeName),
+					zap.Int("claimsToBind", len(claimsToBind)),
+					zap.Error(err))
 				return err
 			}
-			return ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes)
+			if len(reasons) > 0 {
+				sReasons := make([]string, 0)
+				for _, reason := range reasons {
+					sReasons = append(sReasons, string(reason))
+				}
+				sReason := strings.Join(sReasons, ", ")
+				err = fmt.Errorf("pod %s has conflicting volume claims: %s", pod.Name, sReason)
+				log.Logger().Error("Pod has conflicting volume claims",
+					zap.String("podName", assumedPod.Name),
+					zap.String("nodeName", assumedPod.Spec.NodeName),
+					zap.Int("claimsToBind", len(claimsToBind)),
+					zap.Error(err))
+				return err
+			}
+			if volumes.StaticBindings == nil {
+				// convert nil to empty array
+				volumes.StaticBindings = make([]*scheduling.BindingInfo, 0)
+			}
+			if volumes.DynamicProvisions == nil {
+				// convert nil to empty array
+				volumes.DynamicProvisions = make([]*v1.PersistentVolumeClaim, 0)
+			}
+			err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes)
+			if err != nil {
+				log.Logger().Error("Failed to bind pod volumes",
+					zap.String("podName", assumedPod.Name),
+					zap.String("nodeName", assumedPod.Spec.NodeName),
+					zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)),
+					zap.Int("staticBindings", len(volumes.StaticBindings)))
+				return err
+			}
 		}
 	}
 	return nil