Merge pull request #3160 from dandsager1/STORM-3530

STORM-3530 Improve scheduling failure messages
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index cba27cd..75ae8c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -26,7 +26,6 @@
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
@@ -36,7 +35,6 @@
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
@@ -374,22 +372,29 @@
         }
 
         String getRemainingRequiredResourcesMessage() {
+            StringBuilder message = new StringBuilder();
+            message.append("After evicting lower priority topologies: ");
+
+            NormalizedResourceOffer clusterRemainingAvailableResources = new NormalizedResourceOffer();
+            clusterRemainingAvailableResources.add(clusterAvailableResources);
+            clusterRemainingAvailableResources.remove(topologyScheduledResources);
+
             double memoryNeeded = remainingRequiredTopologyMemory;
             double cpuNeeded = remainingRequiredTopologyResources.getTotalCpu();
-            StringBuilder message = new StringBuilder();
-            if (memoryNeeded > 0 || cpuNeeded > 0) {
-                if (memoryNeeded > 0) {
-                    message.append(memoryNeeded).append(" MB ");
-                }
-                if (cpuNeeded > 0) {
-                    message.append(cpuNeeded).append("% CPU ");
-                }
-                if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
-                    message.append(" ");
-                    message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
-                    message.append(" ");
-                }
-                message.append("needed even after evicting lower priority topologies. ");
+            if (memoryNeeded > 0) {
+                message.append("Additional Memory Required: ").append(memoryNeeded).append(" MB ");
+                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalMemoryMb()).append(" MB). ");
+            }
+            if (cpuNeeded > 0) {
+                message.append("Additional CPU Required: ").append(cpuNeeded).append("% CPU ");
+                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalCpu()).append(" % CPU).");
+            }
+            if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
+                message.append(" Additional Topology Required Resources: ");
+                message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
+                message.append(" Cluster Available Resources: ");
+                message.append(clusterRemainingAvailableResources.getNormalizedResources().toString());
+                message.append(".  ");
             }
             return message.toString();
         }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4e5ec5c..2b5afe0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -126,7 +126,7 @@
         } else {
             String comp = td.getExecutorToComponent().get(exec);
             NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-            LOG.error("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
+            LOG.warn("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
             return false;
         }
     }