STORM-3530 Improve scheduling failure messsages
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index cba27cd..40902ae 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -26,7 +26,6 @@
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
@@ -36,7 +35,6 @@
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
@@ -160,6 +158,7 @@
         TopologySchedulingResources topologySchedulingResources = new TopologySchedulingResources(workingState, td);
         final IStrategy finalRasStrategy = rasStrategy;
         for (int i = 0; i < maxSchedulingAttempts; i++) {
+            finalRasStrategy.clearSchedulingFailText();
             SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
             try {
                 SchedulingResult result = null;
@@ -219,8 +218,8 @@
 
                         if (!evictedSomething) {
                             StringBuilder message = new StringBuilder();
-                            message.append("Not enough resources to schedule ");
-                            message.append(topologySchedulingResources.getRemainingRequiredResourcesMessage());
+                            message.append("Not enough resources to schedule - ");
+                            message.append(topologySchedulingResources.getRemainingRequiredResourcesMessage(finalRasStrategy));
                             message.append(result.getErrorMessage());
                             markFailedTopology(topologySubmitter, cluster, td, message.toString());
                             return;
@@ -373,23 +372,31 @@
             return scheduledTopologyMemory;
         }
 
-        String getRemainingRequiredResourcesMessage() {
+        String getRemainingRequiredResourcesMessage(IStrategy rasStrategy) {
+            StringBuilder message = new StringBuilder();
+            message.append("After evicting lower priority topologies: ");
+            message.append(rasStrategy.getSchedulingFailText()).append("\n");
+
+            NormalizedResourceOffer clusterRemainingAvailableResources = new NormalizedResourceOffer();
+            clusterRemainingAvailableResources.add(clusterAvailableResources);
+            clusterRemainingAvailableResources.remove(topologyScheduledResources);
+
             double memoryNeeded = remainingRequiredTopologyMemory;
             double cpuNeeded = remainingRequiredTopologyResources.getTotalCpu();
-            StringBuilder message = new StringBuilder();
-            if (memoryNeeded > 0 || cpuNeeded > 0) {
-                if (memoryNeeded > 0) {
-                    message.append(memoryNeeded).append(" MB ");
-                }
-                if (cpuNeeded > 0) {
-                    message.append(cpuNeeded).append("% CPU ");
-                }
-                if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
-                    message.append(" ");
-                    message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
-                    message.append(" ");
-                }
-                message.append("needed even after evicting lower priority topologies. ");
+            if (memoryNeeded > 0) {
+                message.append("Additional Memory Required: ").append(memoryNeeded).append(" MB ");
+                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalMemoryMb()).append(" MB). ");
+            }
+            if (cpuNeeded > 0) {
+                message.append("Additional CPU Required: ").append(cpuNeeded).append("% CPU ");
+                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalCpu()).append(" % CPU).");
+            }
+            if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
+                message.append(" Additional Topology Required Resources: ");
+                message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
+                message.append(" Cluster Available Resources: ");
+                message.append(clusterRemainingAvailableResources.getNormalizedResources().toString());
+                message.append(".  ");
             }
             return message.toString();
         }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4e5ec5c..a9c187d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -60,6 +60,15 @@
     private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
     private final Map<String, List<RasNode>> rackIdToNodes = new HashMap<>();
     protected RasNodes nodes;
+    private String schedulingFailText;
+
+    public String getSchedulingFailText() {
+        return schedulingFailText;
+    }
+
+    public void clearSchedulingFailText() {
+        schedulingFailText = "";
+    }
 
     @VisibleForTesting
     void prepare(Cluster cluster) {
@@ -126,7 +135,9 @@
         } else {
             String comp = td.getExecutorToComponent().get(exec);
             NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-            LOG.error("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
+            schedulingFailText = "Not Enough Resources to schedule Task "
+                    + exec + " - " + comp + " " + requestedResources;
+            LOG.error(schedulingFailText);
             return false;
         }
     }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index d246b30..1ed11fd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -41,4 +41,10 @@
      *     successful.
      */
     SchedulingResult schedule(Cluster schedulingState, TopologyDetails td);
+
+    // Get topology scheduling failure text.
+    String getSchedulingFailText();
+
+    // Clear topology scheduling failure text.
+    void clearSchedulingFailText();
 }